This is an automated email from the ASF dual-hosted git repository. kenn pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 5ec695b [BEAM-6150] Superinterface for SerializableFunction allowing declared exceptions (#7160) 5ec695b is described below commit 5ec695b9397991996fb215a6f18f33e72d860e53 Author: Jeff Klukas <j...@klukas.net> AuthorDate: Fri Dec 14 15:13:46 2018 -0500 [BEAM-6150] Superinterface for SerializableFunction allowing declared exceptions (#7160) * [BEAM-6150] Superinterface for SerializableFunction allowing declared exceptions Also provides an equivalent superclass for SimpleFunction. See https://issues.apache.org/jira/browse/BEAM-6150 * Update MapElements docs to remove Java 7 references * Update tests to use ProcessFunction * Include tests for both existing and new classes * Remove Java 7 reference in ptransform style guide --- .../org/apache/beam/sdk/transforms/Contextful.java | 4 +- .../org/apache/beam/sdk/transforms/Filter.java | 23 +-- .../beam/sdk/transforms/FlatMapElements.java | 40 +++-- ...{SimpleFunction.java => InferableFunction.java} | 49 ++++--- .../apache/beam/sdk/transforms/MapElements.java | 36 ++--- ...ializableFunction.java => ProcessFunction.java} | 17 ++- .../beam/sdk/transforms/SerializableFunction.java | 11 +- .../apache/beam/sdk/transforms/SimpleFunction.java | 38 +---- .../org/apache/beam/sdk/transforms/ToString.java | 8 +- .../apache/beam/sdk/values/TypeDescriptors.java | 40 ++--- .../org/apache/beam/sdk/transforms/FilterTest.java | 17 +++ .../beam/sdk/transforms/FlatMapElementsTest.java | 60 +++++++- .../beam/sdk/transforms/MapElementsTest.java | 163 ++++++++++++++++++++- website/src/contribute/ptransform-style-guide.md | 4 +- 14 files changed, 358 insertions(+), 152 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Contextful.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Contextful.java index 7e788cf..97a994f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Contextful.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Contextful.java @@ -104,11 +104,11 @@ public final class Contextful<ClosureT> implements Serializable { } /** - * Wraps a {@link SerializableFunction} as a {@link Contextful} of {@link Fn} with empty {@link + * Wraps a {@link ProcessFunction} as a {@link Contextful} of {@link Fn} with empty {@link * Requirements}. */ public static <InputT, OutputT> Contextful<Fn<InputT, OutputT>> fn( - final SerializableFunction<InputT, OutputT> fn) { + final ProcessFunction<InputT, OutputT> fn) { return new Contextful<>((element, c) -> fn.apply(element), Requirements.empty()); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Filter.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Filter.java index 4bffeb6..aa9d2cd 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Filter.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Filter.java @@ -32,7 +32,7 @@ public class Filter<T> extends PTransform<PCollection<T>, PCollection<T>> { /** * Returns a {@code PTransform} that takes an input {@code PCollection<T>} and returns a {@code * PCollection<T>} with elements that satisfy the given predicate. The predicate must be a {@code - * SerializableFunction<T, Boolean>}. + * ProcessFunction<T, Boolean>}. * * <p>Example of use: * @@ -46,7 +46,7 @@ public class Filter<T> extends PTransform<PCollection<T>, PCollection<T>> { * #greaterThanEq}, which return elements satisfying various inequalities with the specified value * based on the elements' natural ordering. */ - public static <T, PredicateT extends SerializableFunction<T, Boolean>> Filter<T> by( + public static <T, PredicateT extends ProcessFunction<T, Boolean>> Filter<T> by( PredicateT predicate) { return new Filter<>(predicate); } @@ -71,7 +71,7 @@ public class Filter<T> extends PTransform<PCollection<T>, PCollection<T>> { * <p>See also {@link #by}, which returns elements that satisfy the given predicate. */ public static <T extends Comparable<T>> Filter<T> lessThan(final T value) { - return by((SerializableFunction<T, Boolean>) input -> input.compareTo(value) < 0) + return by((ProcessFunction<T, Boolean>) input -> input.compareTo(value) < 0) .described(String.format("x < %s", value)); } @@ -95,7 +95,7 @@ public class Filter<T> extends PTransform<PCollection<T>, PCollection<T>> { * <p>See also {@link #by}, which returns elements that satisfy the given predicate. */ public static <T extends Comparable<T>> Filter<T> greaterThan(final T value) { - return by((SerializableFunction<T, Boolean>) input -> input.compareTo(value) > 0) + return by((ProcessFunction<T, Boolean>) input -> input.compareTo(value) > 0) .described(String.format("x > %s", value)); } @@ -119,7 +119,7 @@ public class Filter<T> extends PTransform<PCollection<T>, PCollection<T>> { * <p>See also {@link #by}, which returns elements that satisfy the given predicate. */ public static <T extends Comparable<T>> Filter<T> lessThanEq(final T value) { - return by((SerializableFunction<T, Boolean>) input -> input.compareTo(value) <= 0) + return by((ProcessFunction<T, Boolean>) input -> input.compareTo(value) <= 0) .described(String.format("x ≤ %s", value)); } @@ -143,7 +143,7 @@ public class Filter<T> extends PTransform<PCollection<T>, PCollection<T>> { * <p>See also {@link #by}, which returns elements that satisfy the given predicate. */ public static <T extends Comparable<T>> Filter<T> greaterThanEq(final T value) { - return by((SerializableFunction<T, Boolean>) input -> input.compareTo(value) >= 0) + return by((ProcessFunction<T, Boolean>) input -> input.compareTo(value) >= 0) .described(String.format("x ≥ %s", value)); } @@ -166,20 +166,20 @@ public class Filter<T> extends PTransform<PCollection<T>, PCollection<T>> { * <p>See also {@link #by}, which returns elements that satisfy the given predicate. */ public static <T extends Comparable<T>> Filter<T> equal(final T value) { - return by((SerializableFunction<T, Boolean>) input -> input.compareTo(value) == 0) + return by((ProcessFunction<T, Boolean>) input -> input.compareTo(value) == 0) .described(String.format("x == %s", value)); } /////////////////////////////////////////////////////////////////////////////// - private SerializableFunction<T, Boolean> predicate; + private ProcessFunction<T, Boolean> predicate; private String predicateDescription; - private Filter(SerializableFunction<T, Boolean> predicate) { + private Filter(ProcessFunction<T, Boolean> predicate) { this(predicate, "Filter.predicate"); } - private Filter(SerializableFunction<T, Boolean> predicate, String predicateDescription) { + private Filter(ProcessFunction<T, Boolean> predicate, String predicateDescription) { this.predicate = predicate; this.predicateDescription = predicateDescription; } @@ -199,7 +199,8 @@ public class Filter<T> extends PTransform<PCollection<T>, PCollection<T>> { ParDo.of( new DoFn<T, T>() { @ProcessElement - public void processElement(@Element T element, OutputReceiver<T> r) { + public void processElement(@Element T element, OutputReceiver<T> r) + throws Exception { if (predicate.apply(element)) { r.output(element); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java index edf255a..93fc85a 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java @@ -52,29 +52,29 @@ public class FlatMapElements<InputT, OutputT> } /** - * For a {@code SimpleFunction<InputT, ? extends Iterable<OutputT>>} {@code fn}, return a {@link - * PTransform} that applies {@code fn} to every element of the input {@code PCollection<InputT>} - * and outputs all of the elements to the output {@code PCollection<OutputT>}. + * For a {@code InferableFunction<InputT, ? extends Iterable<OutputT>>} {@code fn}, return a + * {@link PTransform} that applies {@code fn} to every element of the input {@code + * PCollection<InputT>} and outputs all of the elements to the output {@code + * PCollection<OutputT>}. * - * <p>This overload is intended primarily for use in Java 7. In Java 8, the overload {@link - * #via(SerializableFunction)} supports use of lambda for greater concision. + * <p>{@link InferableFunction} has the advantage of providing type descriptor information, but it + * is generally more convenient to specify output type via {@link #into(TypeDescriptor)}, and + * provide the mapping as a lambda expression to {@link #via(ProcessFunction)}. * - * <p>Example of use in Java 7: + * <p>Example usage: * * <pre>{@code * PCollection<String> lines = ...; * PCollection<String> words = lines.apply(FlatMapElements.via( - * new SimpleFunction<String, List<String>>() { - * public Integer apply(String line) { + * new InferableFunction<String, List<String>>() { + * public Integer apply(String line) throws Exception { * return Arrays.asList(line.split(" ")); * } * }); * }</pre> - * - * <p>To use a Java 8 lambda, see {@link #via(SerializableFunction)}. */ public static <InputT, OutputT> FlatMapElements<InputT, OutputT> via( - SimpleFunction<? super InputT, ? extends Iterable<OutputT>> fn) { + InferableFunction<? super InputT, ? extends Iterable<OutputT>> fn) { Contextful<Fn<InputT, Iterable<OutputT>>> wrapped = (Contextful) Contextful.fn(fn); TypeDescriptor<OutputT> outputType = TypeDescriptors.extractFromTypeParameters( @@ -87,7 +87,7 @@ public class FlatMapElements<InputT, OutputT> /** * Returns a new {@link FlatMapElements} transform with the given type descriptor for the output - * type, but the mapping function yet to be specified using {@link #via(SerializableFunction)}. + * type, but the mapping function yet to be specified using {@link #via(ProcessFunction)}. */ public static <OutputT> FlatMapElements<?, OutputT> into( final TypeDescriptor<OutputT> outputType) { @@ -95,29 +95,25 @@ public class FlatMapElements<InputT, OutputT> } /** - * For a {@code SerializableFunction<InputT, ? extends Iterable<OutputT>>} {@code fn}, returns a - * {@link PTransform} that applies {@code fn} to every element of the input {@code - * PCollection<InputT>} and outputs all of the elements to the output {@code - * PCollection<OutputT>}. + * For a {@code ProcessFunction<InputT, ? extends Iterable<OutputT>>} {@code fn}, returns a {@link + * PTransform} that applies {@code fn} to every element of the input {@code PCollection<InputT>} + * and outputs all of the elements to the output {@code PCollection<OutputT>}. * - * <p>Example of use in Java 8: + * <p>Example usage: * * <pre>{@code * PCollection<String> words = lines.apply( * FlatMapElements.into(TypeDescriptors.strings()) * .via((String line) -> Arrays.asList(line.split(" "))) * }</pre> - * - * <p>In Java 7, the overload {@link #via(SimpleFunction)} is more concise as the output type - * descriptor need not be provided. */ public <NewInputT> FlatMapElements<NewInputT, OutputT> via( - SerializableFunction<NewInputT, ? extends Iterable<OutputT>> fn) { + ProcessFunction<NewInputT, ? extends Iterable<OutputT>> fn) { return new FlatMapElements<>( (Contextful) Contextful.fn(fn), fn, TypeDescriptors.inputOf(fn), outputType); } - /** Like {@link #via(SerializableFunction)}, but allows access to additional context. */ + /** Like {@link #via(ProcessFunction)}, but allows access to additional context. */ @Experimental(Experimental.Kind.CONTEXTFUL) public <NewInputT> FlatMapElements<NewInputT, OutputT> via( Contextful<Fn<NewInputT, Iterable<OutputT>>> fn) { diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SimpleFunction.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/InferableFunction.java similarity index 64% copy from sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SimpleFunction.java copy to sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/InferableFunction.java index e3f3cc8..d9dc864 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SimpleFunction.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/InferableFunction.java @@ -24,28 +24,31 @@ import org.apache.beam.sdk.transforms.display.HasDisplayData; import org.apache.beam.sdk.values.TypeDescriptor; /** - * A {@link SerializableFunction} which is not a <i>functional interface</i>. Concrete subclasses - * allow us to infer type information, which in turn aids {@link org.apache.beam.sdk.coders.Coder - * Coder} inference. + * A {@link ProcessFunction} which is not a <i>functional interface</i>. Concrete subclasses allow + * us to infer type information, which in turn aids {@link org.apache.beam.sdk.coders.Coder Coder} + * inference. + * + * <p>See {@link SimpleFunction} for providing robust type information where a {@link + * SerializableFunction} is required. */ -public abstract class SimpleFunction<InputT, OutputT> - implements SerializableFunction<InputT, OutputT>, HasDisplayData { +public abstract class InferableFunction<InputT, OutputT> + implements ProcessFunction<InputT, OutputT>, HasDisplayData { - @Nullable private final SerializableFunction<InputT, OutputT> fn; + @Nullable private final ProcessFunction<InputT, OutputT> fn; - protected SimpleFunction() { + protected InferableFunction() { this.fn = null; // A subclass must override apply if using this constructor. Check that via // reflection. try { Method methodThatMustBeOverridden = - SimpleFunction.class.getDeclaredMethod("apply", new Class[] {Object.class}); + InferableFunction.class.getDeclaredMethod("apply", new Class[] {Object.class}); Method methodOnSubclass = getClass().getMethod("apply", new Class[] {Object.class}); if (methodOnSubclass.equals(methodThatMustBeOverridden)) { throw new IllegalStateException( - "Subclass of SimpleFunction must override 'apply' method" - + " or pass a SerializableFunction to the constructor," + "Subclass of InferableFunction must override 'apply' method" + + " or pass a ProcessFunction to the constructor," + " usually via a lambda or method reference."); } @@ -54,24 +57,24 @@ public abstract class SimpleFunction<InputT, OutputT> } } - protected SimpleFunction(SerializableFunction<InputT, OutputT> fn) { + protected InferableFunction(ProcessFunction<InputT, OutputT> fn) { this.fn = fn; } @Override - public OutputT apply(InputT input) { + public OutputT apply(InputT input) throws Exception { return fn.apply(input); } public static <InputT, OutputT> - SimpleFunction<InputT, OutputT> fromSerializableFunctionWithOutputType( - SerializableFunction<InputT, OutputT> fn, TypeDescriptor<OutputT> outputType) { - return new SimpleFunctionWithOutputType<>(fn, outputType); + InferableFunction<InputT, OutputT> fromProcessFunctionWithOutputType( + ProcessFunction<InputT, OutputT> fn, TypeDescriptor<OutputT> outputType) { + return new InferableFunctionWithOutputType<>(fn, outputType); } /** * Returns a {@link TypeDescriptor} capturing what is known statically about the input type of - * this {@link SimpleFunction} instance's most-derived class. + * this {@link InferableFunction} instance's most-derived class. * * <p>See {@link #getOutputTypeDescriptor} for more discussion. */ @@ -81,9 +84,9 @@ public abstract class SimpleFunction<InputT, OutputT> /** * Returns a {@link TypeDescriptor} capturing what is known statically about the output type of - * this {@link SimpleFunction} instance's most-derived class. + * this {@link InferableFunction} instance's most-derived class. * - * <p>In the normal case of a concrete {@link SimpleFunction} subclass with no generic type + * <p>In the normal case of a concrete {@link InferableFunction} subclass with no generic type * parameters of its own (including anonymous inner classes), this will be a complete non-generic * type, which is good for choosing a default output {@code Coder<OutputT>} for the output {@code * PCollection<OutputT>}. @@ -102,16 +105,16 @@ public abstract class SimpleFunction<InputT, OutputT> public void populateDisplayData(DisplayData.Builder builder) {} /** - * A {@link SimpleFunction} built from a {@link SerializableFunction}, having a known output type + * A {@link InferableFunction} built from a {@link ProcessFunction}, having a known output type * that is explicitly set. */ - private static class SimpleFunctionWithOutputType<InputT, OutputT> - extends SimpleFunction<InputT, OutputT> { + private static class InferableFunctionWithOutputType<InputT, OutputT> + extends InferableFunction<InputT, OutputT> { private final TypeDescriptor<OutputT> outputType; - public SimpleFunctionWithOutputType( - SerializableFunction<InputT, OutputT> fn, TypeDescriptor<OutputT> outputType) { + public InferableFunctionWithOutputType( + ProcessFunction<InputT, OutputT> fn, TypeDescriptor<OutputT> outputType) { super(fn); this.outputType = outputType; } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java index 41aac41..dc73cf8 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java @@ -50,64 +50,58 @@ public class MapElements<InputT, OutputT> } /** - * For a {@code SimpleFunction<InputT, OutputT>} {@code fn}, returns a {@code PTransform} that + * For {@code InferableFunction<InputT, OutputT>} {@code fn}, returns a {@code PTransform} that * takes an input {@code PCollection<InputT>} and returns a {@code PCollection<OutputT>} * containing {@code fn.apply(v)} for every element {@code v} in the input. * - * <p>This overload is intended primarily for use in Java 7. In Java 8, the overload {@link - * #via(SerializableFunction)} supports use of lambda for greater concision. + * <p>{@link InferableFunction} has the advantage of providing type descriptor information, but it + * is generally more convenient to specify output type via {@link #into(TypeDescriptor)}, and + * provide the mapping as a lambda expression to {@link #via(ProcessFunction)}. * - * <p>Example of use in Java 7: + * <p>Example usage: * * <pre>{@code * PCollection<String> words = ...; * PCollection<Integer> wordsPerLine = words.apply(MapElements.via( - * new SimpleFunction<String, Integer>() { - * public Integer apply(String word) { + * new InferableFunction<String, Integer>() { + * public Integer apply(String word) throws Exception { * return word.length(); * } * })); * }</pre> */ public static <InputT, OutputT> MapElements<InputT, OutputT> via( - final SimpleFunction<InputT, OutputT> fn) { + final InferableFunction<InputT, OutputT> fn) { return new MapElements<>( Contextful.fn(fn), fn, fn.getInputTypeDescriptor(), fn.getOutputTypeDescriptor()); } /** * Returns a new {@link MapElements} transform with the given type descriptor for the output type, - * but the mapping function yet to be specified using {@link #via(SerializableFunction)}. + * but the mapping function yet to be specified using {@link #via(ProcessFunction)}. */ public static <OutputT> MapElements<?, OutputT> into(final TypeDescriptor<OutputT> outputType) { return new MapElements<>(null, null, null, outputType); } /** - * For a {@code SerializableFunction<InputT, OutputT>} {@code fn} and output type descriptor, - * returns a {@code PTransform} that takes an input {@code PCollection<InputT>} and returns a - * {@code PCollection<OutputT>} containing {@code fn.apply(v)} for every element {@code v} in the - * input. + * For a {@code ProcessFunction<InputT, OutputT>} {@code fn} and output type descriptor, returns a + * {@code PTransform} that takes an input {@code PCollection<InputT>} and returns a {@code + * PCollection<OutputT>} containing {@code fn.apply(v)} for every element {@code v} in the input. * - * <p>Example of use in Java 8: + * <p>Example usage: * * <pre>{@code * PCollection<Integer> wordLengths = words.apply( * MapElements.into(TypeDescriptors.integers()) * .via((String word) -> word.length())); * }</pre> - * - * <p>In Java 7, the overload {@link #via(SimpleFunction)} is more concise as the output type - * descriptor need not be provided. */ - public <NewInputT> MapElements<NewInputT, OutputT> via( - SerializableFunction<NewInputT, OutputT> fn) { + public <NewInputT> MapElements<NewInputT, OutputT> via(ProcessFunction<NewInputT, OutputT> fn) { return new MapElements<>(Contextful.fn(fn), fn, TypeDescriptors.inputOf(fn), outputType); } - /** - * Like {@link #via(SerializableFunction)}, but supports access to context, such as side inputs. - */ + /** Like {@link #via(ProcessFunction)}, but supports access to context, such as side inputs. */ @Experimental(Kind.CONTEXTFUL) public <NewInputT> MapElements<NewInputT, OutputT> via(Contextful<Fn<NewInputT, OutputT>> fn) { return new MapElements<>( diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SerializableFunction.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ProcessFunction.java similarity index 51% copy from sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SerializableFunction.java copy to sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ProcessFunction.java index b2ac9ed..b0e8807 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SerializableFunction.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ProcessFunction.java @@ -23,10 +23,23 @@ import java.io.Serializable; * A function that computes an output value of type {@code OutputT} from an input value of type * {@code InputT} and is {@link Serializable}. * + * <p>This is the most general function type provided in this SDK, allowing arbitrary {@code + * Exception}s to be thrown, and matching Java's expectations of a <i>functional interface</i> that + * can be supplied as a lambda expression or method reference. It is named {@code ProcessFunction} + * because it is particularly appropriate anywhere a user needs to provide code that will eventually + * be executed as part of a {@link DoFn} {@link org.apache.beam.sdk.transforms.DoFn.ProcessElement + * ProcessElement} function, which is allowed to declare throwing {@code Exception}. If you need to + * execute user code in a context where arbitrary checked exceptions should not be allowed, require + * that users implement the subinterface {@link SerializableFunction} instead. + * + * <p>For more robust {@link org.apache.beam.sdk.coders.Coder Coder} inference, consider extending + * {@link InferableFunction} rather than implementing this interface directly. + * * @param <InputT> input value type * @param <OutputT> output value type */ -public interface SerializableFunction<InputT, OutputT> extends Serializable { +@FunctionalInterface +public interface ProcessFunction<InputT, OutputT> extends Serializable { /** Returns the result of invoking this function on the given input. */ - OutputT apply(InputT input); + OutputT apply(InputT input) throws Exception; } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SerializableFunction.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SerializableFunction.java index b2ac9ed..a1dba9e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SerializableFunction.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SerializableFunction.java @@ -21,12 +21,19 @@ import java.io.Serializable; /** * A function that computes an output value of type {@code OutputT} from an input value of type - * {@code InputT} and is {@link Serializable}. + * {@code InputT}, is {@link Serializable}, and does not allow checked exceptions to be declared. + * + * <p>To allow checked exceptions, implement the superinterface {@link ProcessFunction} instead. To + * allow more robust {@link org.apache.beam.sdk.coders.Coder Coder} inference, see {@link + * InferableFunction}. * * @param <InputT> input value type * @param <OutputT> output value type */ -public interface SerializableFunction<InputT, OutputT> extends Serializable { +@FunctionalInterface +public interface SerializableFunction<InputT, OutputT> + extends ProcessFunction<InputT, OutputT>, Serializable { /** Returns the result of invoking this function on the given input. */ + @Override OutputT apply(InputT input); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SimpleFunction.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SimpleFunction.java index e3f3cc8..0e19272 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SimpleFunction.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SimpleFunction.java @@ -19,8 +19,6 @@ package org.apache.beam.sdk.transforms; import java.lang.reflect.Method; import javax.annotation.Nullable; -import org.apache.beam.sdk.transforms.display.DisplayData; -import org.apache.beam.sdk.transforms.display.HasDisplayData; import org.apache.beam.sdk.values.TypeDescriptor; /** @@ -28,8 +26,8 @@ import org.apache.beam.sdk.values.TypeDescriptor; * allow us to infer type information, which in turn aids {@link org.apache.beam.sdk.coders.Coder * Coder} inference. */ -public abstract class SimpleFunction<InputT, OutputT> - implements SerializableFunction<InputT, OutputT>, HasDisplayData { +public abstract class SimpleFunction<InputT, OutputT> extends InferableFunction<InputT, OutputT> + implements SerializableFunction<InputT, OutputT> { @Nullable private final SerializableFunction<InputT, OutputT> fn; @@ -70,38 +68,6 @@ public abstract class SimpleFunction<InputT, OutputT> } /** - * Returns a {@link TypeDescriptor} capturing what is known statically about the input type of - * this {@link SimpleFunction} instance's most-derived class. - * - * <p>See {@link #getOutputTypeDescriptor} for more discussion. - */ - public TypeDescriptor<InputT> getInputTypeDescriptor() { - return new TypeDescriptor<InputT>(this) {}; - } - - /** - * Returns a {@link TypeDescriptor} capturing what is known statically about the output type of - * this {@link SimpleFunction} instance's most-derived class. - * - * <p>In the normal case of a concrete {@link SimpleFunction} subclass with no generic type - * parameters of its own (including anonymous inner classes), this will be a complete non-generic - * type, which is good for choosing a default output {@code Coder<OutputT>} for the output {@code - * PCollection<OutputT>}. - */ - public TypeDescriptor<OutputT> getOutputTypeDescriptor() { - return new TypeDescriptor<OutputT>(this) {}; - } - - /** - * {@inheritDoc} - * - * <p>By default, does not register any display data. Implementors may override this method to - * provide their own display data. - */ - @Override - public void populateDisplayData(DisplayData.Builder builder) {} - - /** * A {@link SimpleFunction} built from a {@link SerializableFunction}, having a known output type * that is explicitly set. */ diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ToString.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ToString.java index 5f214f3..ad11cd0 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ToString.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ToString.java @@ -27,7 +27,7 @@ import org.apache.beam.sdk.values.PCollection; * PCollection<Iterable<?>>} to a {@link PCollection PCollection<String>}. * * <p><b>Note</b>: For any custom string conversion and formatting, we recommend applying your own - * {@link SerializableFunction} using {@link MapElements#via(SerializableFunction)} + * {@link ProcessFunction} using {@link MapElements#via(ProcessFunction)} */ public final class ToString { private ToString() { @@ -96,7 +96,7 @@ public final class ToString { * }</pre> * * <p><b>Note</b>: For any custom string conversion and formatting, we recommend applying your own - * {@link SerializableFunction} using {@link MapElements#via(SerializableFunction)} + * {@link ProcessFunction} using {@link MapElements#via(ProcessFunction)} */ private static final class Elements extends PTransform<PCollection<?>, PCollection<String>> { @Override @@ -125,7 +125,7 @@ public final class ToString { * }</pre> * * <p><b>Note</b>: For any custom string conversion and formatting, we recommend applying your own - * {@link SerializableFunction} using {@link MapElements#via(SerializableFunction)} + * {@link ProcessFunction} using {@link MapElements#via(ProcessFunction)} */ private static final class KVs extends PTransform<PCollection<? extends KV<?, ?>>, PCollection<String>> { @@ -160,7 +160,7 @@ public final class ToString { * }</pre> * * <p><b>Note</b>: For any custom string conversion and formatting, we recommend applying your own - * {@link SerializableFunction} using {@link MapElements#via(SerializableFunction)} + * {@link ProcessFunction} using {@link MapElements#via(ProcessFunction)} */ private static final class Iterables extends PTransform<PCollection<? extends Iterable<?>>, PCollection<String>> { diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypeDescriptors.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypeDescriptors.java index d125356..e605f7c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypeDescriptors.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypeDescriptors.java @@ -25,7 +25,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import org.apache.beam.sdk.transforms.Contextful; -import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.ProcessFunction; /** * A utility class for creating {@link TypeDescriptor} objects for different types, such as Java @@ -339,16 +339,16 @@ public class TypeDescriptors { * * <pre>{@code * class Foo<BarT> { - * private SerializableFunction<BarT, String> fn; + * private ProcessFunction<BarT, String> fn; * * TypeDescriptor<BarT> inferBarTypeDescriptorFromFn() { * return TypeDescriptors.extractFromTypeParameters( * fn, - * SerializableFunction.class, + * ProcessFunction.class, * // The actual type of "fn" is matched against the input type of the extractor, * // and the obtained values of type variables of the superclass are substituted * // into the output type of the extractor. - * new TypeVariableExtractor<SerializableFunction<BarT, String>, BarT>() {}); + * new TypeVariableExtractor<ProcessFunction<BarT, String>, BarT>() {}); * } * } * }</pre> @@ -374,20 +374,20 @@ public class TypeDescriptors { public static <T, V> TypeDescriptor<V> extractFromTypeParameters( TypeDescriptor<T> type, Class<? super T> supertype, TypeVariableExtractor<T, V> extractor) { // Get the type signature of the extractor, e.g. - // TypeVariableExtractor<SerializableFunction<BarT, String>, BarT> + // TypeVariableExtractor<ProcessFunction<BarT, String>, BarT> TypeDescriptor<TypeVariableExtractor<T, V>> extractorSupertype = (TypeDescriptor<TypeVariableExtractor<T, V>>) TypeDescriptor.of(extractor.getClass()).getSupertype(TypeVariableExtractor.class); - // Get the actual type argument, e.g. SerializableFunction<BarT, String> + // Get the actual type argument, e.g. ProcessFunction<BarT, String> Type inputT = ((ParameterizedType) extractorSupertype.getType()).getActualTypeArguments()[0]; // Get the actual supertype of the type being analyzed, hopefully with all type parameters - // resolved, e.g. SerializableFunction<Integer, String> + // resolved, e.g. ProcessFunction<Integer, String> TypeDescriptor supertypeDescriptor = type.getSupertype(supertype); // Substitute actual supertype into the extractor, e.g. - // TypeVariableExtractor<SerializableFunction<Integer, String>, Integer> + // TypeVariableExtractor<ProcessFunction<Integer, String>, Integer> TypeDescriptor<TypeVariableExtractor<T, V>> extractorT = extractorSupertype.where(inputT, supertypeDescriptor.getType()); @@ -397,30 +397,30 @@ public class TypeDescriptors { } /** - * Returns a type descriptor for the input of the given {@link SerializableFunction}, subject to - * Java type erasure: may contain unresolved type variables if the type was erased. + * Returns a type descriptor for the input of the given {@link ProcessFunction}, subject to Java + * type erasure: may contain unresolved type variables if the type was erased. */ public static <InputT, OutputT> TypeDescriptor<InputT> inputOf( - SerializableFunction<InputT, OutputT> fn) { + ProcessFunction<InputT, OutputT> fn) { return extractFromTypeParameters( fn, - SerializableFunction.class, - new TypeVariableExtractor<SerializableFunction<InputT, OutputT>, InputT>() {}); + ProcessFunction.class, + new TypeVariableExtractor<ProcessFunction<InputT, OutputT>, InputT>() {}); } /** - * Returns a type descriptor for the output of the given {@link SerializableFunction}, subject to - * Java type erasure: may contain unresolved type variables if the type was erased. + * Returns a type descriptor for the output of the given {@link ProcessFunction}, subject to Java + * type erasure: may contain unresolved type variables if the type was erased. */ public static <InputT, OutputT> TypeDescriptor<OutputT> outputOf( - SerializableFunction<InputT, OutputT> fn) { + ProcessFunction<InputT, OutputT> fn) { return extractFromTypeParameters( fn, - SerializableFunction.class, - new TypeVariableExtractor<SerializableFunction<InputT, OutputT>, OutputT>() {}); + ProcessFunction.class, + new TypeVariableExtractor<ProcessFunction<InputT, OutputT>, OutputT>() {}); } - /** Like {@link #inputOf(SerializableFunction)} but for {@link Contextful.Fn}. */ + /** Like {@link #inputOf(ProcessFunction)} but for {@link Contextful.Fn}. */ public static <InputT, OutputT> TypeDescriptor<InputT> inputOf( Contextful.Fn<InputT, OutputT> fn) { return TypeDescriptors.extractFromTypeParameters( @@ -429,7 +429,7 @@ public class TypeDescriptors { new TypeDescriptors.TypeVariableExtractor<Contextful.Fn<InputT, OutputT>, InputT>() {}); } - /** Like {@link #outputOf(SerializableFunction)} but for {@link Contextful.Fn}. */ + /** Like {@link #outputOf(ProcessFunction)} but for {@link Contextful.Fn}. */ public static <InputT, OutputT> TypeDescriptor<OutputT> outputOf( Contextful.Fn<InputT, OutputT> fn) { return TypeDescriptors.extractFromTypeParameters( diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FilterTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FilterTest.java index a05b7eb..091c28f 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FilterTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FilterTest.java @@ -58,6 +58,13 @@ public class FilterTest implements Serializable { } } + static class EvenProcessFn implements ProcessFunction<Integer, Boolean> { + @Override + public Boolean apply(Integer elem) throws Exception { + return elem % 2 == 0; + } + } + @Rule public final TestPipeline p = TestPipeline.create(); @Rule public transient ExpectedException thrown = ExpectedException.none(); @@ -95,6 +102,16 @@ public class FilterTest implements Serializable { @Test @Category(NeedsRunner.class) + public void testFilterByProcessFunction() { + PCollection<Integer> output = + p.apply(Create.of(1, 2, 3, 4, 5, 6, 7)).apply(Filter.by(new EvenProcessFn())); + + PAssert.that(output).containsInAnyOrder(2, 4, 6); + p.run(); + } + + @Test + @Category(NeedsRunner.class) public void testFilterLessThan() { PCollection<Integer> output = p.apply(Create.of(1, 2, 3, 4, 5, 6, 7)).apply(Filter.lessThan(4)); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsTest.java index 061765a..a927bf9 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsTest.java @@ -58,12 +58,12 @@ public class FlatMapElementsTest implements Serializable { /** Basic test of {@link FlatMapElements} with a {@link SimpleFunction}. */ @Test @Category(NeedsRunner.class) - public void testFlatMapBasic() throws Exception { + public void testFlatMapSimpleFunction() throws Exception { PCollection<Integer> output = pipeline .apply(Create.of(1, 2, 3)) - // Note that FlatMapElements takes a SimpleFunction<InputT, ? extends Iterable<OutputT>> + // Note that FlatMapElements takes an InferableFunction<InputT, ? extends Iterable<OutputT>> // so the use of List<Integer> here (as opposed to Iterable<Integer>) deliberately exercises // the use of an upper bound. .apply( @@ -79,6 +79,26 @@ public class FlatMapElementsTest implements Serializable { pipeline.run(); } + /** Basic test of {@link FlatMapElements} with an {@link InferableFunction}. */ + @Test + @Category(NeedsRunner.class) + public void testFlatMapInferableFunction() throws Exception { + PCollection<Integer> output = + pipeline + .apply(Create.of(1, 2, 3)) + .apply( + FlatMapElements.via( + new InferableFunction<Integer, List<Integer>>() { + @Override + public List<Integer> apply(Integer input) throws Exception { + return ImmutableList.of(-input, input); + } + })); + + PAssert.that(output).containsInAnyOrder(1, -2, -1, -3, 2, 3); + pipeline.run(); + } + /** Basic test of {@link FlatMapElements} with a {@link Fn} and a side input. */ @Test @Category(NeedsRunner.class) @@ -182,6 +202,20 @@ public class FlatMapElementsTest implements Serializable { } @Test + public void testInferableFunctionClassDisplayData() { + InferableFunction<Integer, List<Integer>> inferableFn = + new InferableFunction<Integer, List<Integer>>() { + @Override + public List<Integer> apply(Integer input) { + return Collections.emptyList(); + } + }; + + FlatMapElements<?, ?> inferableMap = FlatMapElements.via(inferableFn); + assertThat(DisplayData.from(inferableMap), hasDisplayItem("class", inferableFn.getClass())); + } + + @Test public void testSimpleFunctionDisplayData() { SimpleFunction<Integer, List<Integer>> simpleFn = new SimpleFunction<Integer, List<Integer>>() { @@ -202,6 +236,26 @@ public class FlatMapElementsTest implements Serializable { } @Test + public void testInferableFunctionDisplayData() { + InferableFunction<Integer, List<Integer>> inferableFn = + new InferableFunction<Integer, List<Integer>>() { + @Override + public List<Integer> apply(Integer input) { + return Collections.emptyList(); + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder.add(DisplayData.item("foo", "baz")); + } + }; + + FlatMapElements<?, ?> inferableFlatMap = FlatMapElements.via(inferableFn); + assertThat(DisplayData.from(inferableFlatMap), hasDisplayItem("class", inferableFn.getClass())); + assertThat(DisplayData.from(inferableFlatMap), hasDisplayItem("foo", "baz")); + } + + @Test @Category(NeedsRunner.class) public void testVoidValues() throws Exception { pipeline @@ -230,7 +284,7 @@ public class FlatMapElementsTest implements Serializable { /** * Basic test of {@link FlatMapElements} with a lambda (which is instantiated as a {@link - * SerializableFunction}). + * ProcessFunction}). */ @Test @Category(NeedsRunner.class) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java index c10b06e..952885a 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java @@ -76,10 +76,33 @@ public class MapElementsTest implements Serializable { } } + /** + * An {@link InferableFunction} to test that the coder registry can propagate coders that are + * bound to type variables. + */ + private static class PolymorphicInferableFunction<T> extends InferableFunction<T, T> { + @Override + public T apply(T input) throws Exception { + return input; + } + } + + /** + * An {@link InferableFunction} to test that the coder registry can propagate coders that are + * bound to type variables, when the variable appears nested in the output. + */ + private static class NestedPolymorphicInferableFunction<T> + extends InferableFunction<T, KV<T, String>> { + @Override + public KV<T, String> apply(T input) throws Exception { + return KV.of(input, "hello"); + } + } + /** Basic test of {@link MapElements} with a {@link SimpleFunction}. */ @Test @Category(NeedsRunner.class) - public void testMapBasic() throws Exception { + public void testMapSimpleFunction() throws Exception { PCollection<Integer> output = pipeline .apply(Create.of(1, 2, 3)) @@ -96,6 +119,26 @@ public class MapElementsTest implements Serializable { pipeline.run(); } + /** Basic test of {@link MapElements} with an {@link InferableFunction}. */ + @Test + @Category(NeedsRunner.class) + public void testMapInferableFunction() throws Exception { + PCollection<Integer> output = + pipeline + .apply(Create.of(1, 2, 3)) + .apply( + MapElements.via( + new InferableFunction<Integer, Integer>() { + @Override + public Integer apply(Integer input) throws Exception { + return -input; + } + })); + + PAssert.that(output).containsInAnyOrder(-2, -1, -3); + pipeline.run(); + } + /** Basic test of {@link MapElements} with a {@link Fn} and a side input. */ @Test @Category(NeedsRunner.class) @@ -140,6 +183,28 @@ public class MapElementsTest implements Serializable { } /** + * Basic test of {@link MapElements} coder propagation with a parametric {@link + * InferableFunction}. + */ + @Test + public void testPolymorphicInferableFunction() throws Exception { + pipeline.enableAbandonedNodeEnforcement(false); + + pipeline + .apply(Create.of(1, 2, 3)) + .apply("Polymorphic Identity", MapElements.via(new PolymorphicInferableFunction<>())) + .apply( + "Test Consumer", + MapElements.via( + new InferableFunction<Integer, Integer>() { + @Override + public Integer apply(Integer input) throws Exception { + return input; + } + })); + } + + /** * Test of {@link MapElements} coder propagation with a parametric {@link SimpleFunction} where * the type variable occurs nested within other concrete type constructors. */ @@ -166,12 +231,31 @@ public class MapElementsTest implements Serializable { } /** - * Basic test of {@link MapElements} with a {@link SerializableFunction}. This style is generally - * discouraged in Java 7, in favor of {@link SimpleFunction}. + * Test of {@link MapElements} coder propagation with a parametric {@link InferableFunction} where + * the type variable occurs nested within other concrete type constructors. */ @Test + public void testNestedPolymorphicInferableFunction() throws Exception { + pipeline.enableAbandonedNodeEnforcement(false); + + pipeline + .apply(Create.of(1, 2, 3)) + .apply("Polymorphic Identity", MapElements.via(new NestedPolymorphicInferableFunction<>())) + .apply( + "Test Consumer", + MapElements.via( + new InferableFunction<KV<Integer, String>, Integer>() { + @Override + public Integer apply(KV<Integer, String> input) throws Exception { + return 42; + } + })); + } + + /** Basic test of {@link MapElements} with a {@link ProcessFunction}. */ + @Test @Category(NeedsRunner.class) - public void testMapBasicSerializableFunction() throws Exception { + public void testMapBasicProcessFunction() throws Exception { PCollection<Integer> output = pipeline.apply(Create.of(1, 2, 3)).apply(MapElements.into(integers()).via(input -> -input)); @@ -208,6 +292,35 @@ public class MapElementsTest implements Serializable { pipeline.run(); } + /** + * Tests that when built with a concrete subclass of {@link InferableFunction}, the type + * descriptor of the output reflects its static type. + */ + @Test + @Category(NeedsRunner.class) + public void testInferableFunctionOutputTypeDescriptor() throws Exception { + PCollection<String> output = + pipeline + .apply(Create.of("hello")) + .apply( + MapElements.via( + new InferableFunction<String, String>() { + @Override + public String apply(String input) throws Exception { + return input; + } + })); + assertThat( + output.getTypeDescriptor(), + equalTo((TypeDescriptor<String>) new TypeDescriptor<String>() {})); + assertThat( + pipeline.getCoderRegistry().getCoder(output.getTypeDescriptor()), + equalTo(pipeline.getCoderRegistry().getCoder(new TypeDescriptor<String>() {}))); + + // Make sure the pipeline runs too + pipeline.run(); + } + @Test @Category(NeedsRunner.class) public void testVoidValues() throws Exception { @@ -229,6 +342,14 @@ public class MapElementsTest implements Serializable { } @Test + public void testProcessFunctionDisplayData() { + ProcessFunction<Integer, Integer> processFn = input -> input; + + MapElements<?, ?> processMap = MapElements.into(integers()).via(processFn); + assertThat(DisplayData.from(processMap), hasDisplayItem("class", processFn.getClass())); + } + + @Test public void testSimpleFunctionClassDisplayData() { SimpleFunction<?, ?> simpleFn = new SimpleFunction<Integer, Integer>() { @@ -243,6 +364,20 @@ public class MapElementsTest implements Serializable { } @Test + public void testInferableFunctionClassDisplayData() { + InferableFunction<?, ?> inferableFn = + new InferableFunction<Integer, Integer>() { + @Override + public Integer apply(Integer input) throws Exception { + return input; + } + }; + + MapElements<?, ?> inferableMap = MapElements.via(inferableFn); + assertThat(DisplayData.from(inferableMap), hasDisplayItem("class", inferableFn.getClass())); + } + + @Test public void testSimpleFunctionDisplayData() { SimpleFunction<Integer, ?> simpleFn = new SimpleFunction<Integer, Integer>() { @@ -263,6 +398,26 @@ public class MapElementsTest implements Serializable { } @Test + public void testInferableFunctionDisplayData() { + InferableFunction<Integer, ?> inferableFn = + new InferableFunction<Integer, Integer>() { + @Override + public Integer apply(Integer input) { + return input; + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder.add(DisplayData.item("foo", "baz")); + } + }; + + MapElements<?, ?> inferableMap = MapElements.via(inferableFn); + assertThat(DisplayData.from(inferableMap), hasDisplayItem("class", inferableFn.getClass())); + assertThat(DisplayData.from(inferableMap), hasDisplayItem("foo", "baz")); + } + + @Test @Category(ValidatesRunner.class) public void testPrimitiveDisplayData() { SimpleFunction<Integer, ?> mapFn = diff --git a/website/src/contribute/ptransform-style-guide.md b/website/src/contribute/ptransform-style-guide.md index 4cdcb7b..d07529c 100644 --- a/website/src/contribute/ptransform-style-guide.md +++ b/website/src/contribute/ptransform-style-guide.md @@ -395,8 +395,8 @@ If the transform has an aspect of behavior to be customized by a user's code, ma Do: -* If possible, just use PTransform composition as an extensibility device - i.e. if the same effect can be achieved by the user applying the transform in their pipeline and composing it with another `PTransform`, then the transform itself should not be extensible. E.g., a transform that writes JSON objects to a third-party system should take a `PCollection<JsonObject>` (assuming it is possible to provide a `Coder` for `JsonObject`), rather than taking a generic `PCollection<T>` and a `Se [...] -* If extensibility by user code is necessary inside the transform, pass the user code as a `SerializableFunction` or define your own serializable function-like type (ideally single-method, for interoperability with Java 8 lambdas). Because Java erases the types of lambdas, you should be sure to have adequate type information even if a raw-type `SerializableFunction` is provided by the user. See `MapElements` and `FlatMapElements` for examples of how to use `SimpleFunction` and `Serializa [...] +* If possible, just use PTransform composition as an extensibility device - i.e. if the same effect can be achieved by the user applying the transform in their pipeline and composing it with another `PTransform`, then the transform itself should not be extensible. E.g., a transform that writes JSON objects to a third-party system should take a `PCollection<JsonObject>` (assuming it is possible to provide a `Coder` for `JsonObject`), rather than taking a generic `PCollection<T>` and a `Pr [...] +* If extensibility by user code is necessary inside the transform, pass the user code as a `ProcessFunction` or define your own serializable function-like type (ideally single-method, for interoperability with Java 8 lambdas). Because Java erases the types of lambdas, you should be sure to have adequate type information even if a raw-type `ProcessFunction` is provided by the user. See `MapElements` and `FlatMapElements` for examples of how to use `ProcessFunction` and `InferableFunction` [...] Do not: