Repository: beam Updated Branches: refs/heads/master ed7b82e7e -> 144bffd40
Add wrapping of lambda in a SimpleFunction Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/23152178 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/23152178 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/23152178 Branch: refs/heads/master Commit: 23152178f81e635db65a7aae71f47fa67b3dc065 Parents: ed7b82e Author: Kenneth Knowles <k...@google.com> Authored: Thu Jan 26 11:19:42 2017 -0800 Committer: Kenneth Knowles <k...@google.com> Committed: Tue Feb 7 08:16:39 2017 -0800 ---------------------------------------------------------------------- .../beam/sdk/transforms/SimpleFunction.java | 44 +++++++++++-- .../beam/sdk/transforms/SimpleFunctionTest.java | 43 ++++++++++++ .../sdk/transforms/MapElementsJava8Test.java | 24 ++++++- .../sdk/transforms/SimpleFunctionJava8Test.java | 69 ++++++++++++++++++++ 4 files changed, 170 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/23152178/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SimpleFunction.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SimpleFunction.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SimpleFunction.java index 8604659..db44380 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SimpleFunction.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SimpleFunction.java @@ -17,6 +17,8 @@ */ package org.apache.beam.sdk.transforms; +import java.lang.reflect.Method; +import javax.annotation.Nullable; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.display.HasDisplayData; import org.apache.beam.sdk.values.TypeDescriptor; @@ -29,6 +31,40 @@ import org.apache.beam.sdk.values.TypeDescriptor; public abstract class SimpleFunction<InputT, OutputT> implements SerializableFunction<InputT, OutputT>, HasDisplayData { + @Nullable + private final SerializableFunction<InputT, OutputT> fn; + + protected SimpleFunction() { + this.fn = null; + // A subclass must override apply if using this constructor. Check that via + // reflection. + try { + Method methodThatMustBeOverridden = + SimpleFunction.class.getDeclaredMethod("apply", new Class[] {Object.class}); + Method methodOnSubclass = + getClass().getMethod("apply", new Class[] {Object.class}); + + if (methodOnSubclass.equals(methodThatMustBeOverridden)) { + throw new IllegalStateException( + "Subclass of SimpleFunction must override 'apply' method" + + " or pass a SerializableFunction to the constructor," + + " usually via a lambda or method reference."); + } + + } catch (NoSuchMethodException exc) { + throw new RuntimeException("Impossible state: missing 'apply' method entirely", exc); + } + } + + protected SimpleFunction(SerializableFunction<InputT, OutputT> fn) { + this.fn = fn; + } + + @Override + public OutputT apply(InputT input) { + return fn.apply(input); + } + public static <InputT, OutputT> SimpleFunction<InputT, OutputT> fromSerializableFunctionWithOutputType( SerializableFunction<InputT, OutputT> fn, TypeDescriptor<OutputT> outputType) { @@ -77,23 +113,17 @@ public abstract class SimpleFunction<InputT, OutputT> private static class SimpleFunctionWithOutputType<InputT, OutputT> extends SimpleFunction<InputT, OutputT> { - private final SerializableFunction<InputT, OutputT> fn; private final TypeDescriptor<OutputT> outputType; public SimpleFunctionWithOutputType( SerializableFunction<InputT, OutputT> fn, TypeDescriptor<OutputT> outputType) { - this.fn = fn; + super(fn); this.outputType = outputType; } @Override - public OutputT apply(InputT input) { - return fn.apply(input); - } - - @Override public TypeDescriptor<OutputT> getOutputTypeDescriptor() { return outputType; } http://git-wip-us.apache.org/repos/asf/beam/blob/23152178/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SimpleFunctionTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SimpleFunctionTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SimpleFunctionTest.java new file mode 100644 index 0000000..bcfb558 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SimpleFunctionTest.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link SimpleFunction}. + */ +@RunWith(JUnit4.class) +public class SimpleFunctionTest { + + @Rule + public ExpectedException thrown = ExpectedException.none(); + + @Test + public void testFailureIfNotOverridden() { + thrown.expect(IllegalStateException.class); + thrown.expectMessage("must override"); + thrown.expectMessage("apply"); + + SimpleFunction<Integer, Integer> broken = new SimpleFunction<Integer, Integer>() {}; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/23152178/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/MapElementsJava8Test.java ---------------------------------------------------------------------- diff --git a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/MapElementsJava8Test.java b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/MapElementsJava8Test.java index ce0f111..7e63a7d 100644 --- a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/MapElementsJava8Test.java +++ b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/MapElementsJava8Test.java @@ -37,11 +37,11 @@ public class MapElementsJava8Test implements Serializable { public final transient TestPipeline pipeline = TestPipeline.create(); /** - * Basic test of {@link MapElements} with a lambda (which is instantiated as a - * {@link SerializableFunction}). + * Basic test of {@link MapElements} with a lambda (which is instantiated as a {@link + * SerializableFunction}). */ @Test - public void testMapBasic() throws Exception { + public void testMapLambda() throws Exception { PCollection<Integer> output = pipeline .apply(Create.of(1, 2, 3)) @@ -55,6 +55,24 @@ public class MapElementsJava8Test implements Serializable { } /** + * Basic test of {@link MapElements} with a lambda wrapped into a {@link SimpleFunction} to + * remember its type. + */ + @Test + public void testMapWrappedLambda() throws Exception { + + PCollection<Integer> output = + pipeline + .apply(Create.of(1, 2, 3)) + .apply( + MapElements + .via(new SimpleFunction<Integer, Integer>((Integer i) -> i * 2) {})); + + PAssert.that(output).containsInAnyOrder(6, 2, 4); + pipeline.run(); + } + + /** * Basic test of {@link MapElements} with a method reference. */ @Test http://git-wip-us.apache.org/repos/asf/beam/blob/23152178/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/SimpleFunctionJava8Test.java ---------------------------------------------------------------------- diff --git a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/SimpleFunctionJava8Test.java b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/SimpleFunctionJava8Test.java new file mode 100644 index 0000000..9beab34 --- /dev/null +++ b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/SimpleFunctionJava8Test.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms; + +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertThat; + +import java.io.Serializable; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.values.TypeDescriptors; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Java 8 tests for {@link SimpleFunction}. + */ +@RunWith(JUnit4.class) +public class SimpleFunctionJava8Test implements Serializable { + + @Rule + public final transient TestPipeline pipeline = TestPipeline.create(); + + /** + * Basic test of {@link MapElements} with a lambda (which is instantiated as a {@link + * SerializableFunction}). + */ + @Test + public void testGoodTypeForLambda() throws Exception { + SimpleFunction<Integer, String> fn = + new SimpleFunction<Integer, String>((Integer i) -> i.toString()) {}; + + assertThat(fn.getInputTypeDescriptor(), equalTo(TypeDescriptors.integers())); + assertThat(fn.getOutputTypeDescriptor(), equalTo(TypeDescriptors.strings())); + } + + /** + * Basic test of {@link MapElements} with a lambda wrapped into a {@link SimpleFunction} to + * remember its type. + */ + @Test + public void testGoodTypeForMethodRef() throws Exception { + SimpleFunction<Integer, String> fn = + new SimpleFunction<Integer, String>(SimpleFunctionJava8Test::toStringThisThing) {}; + + assertThat(fn.getInputTypeDescriptor(), equalTo(TypeDescriptors.integers())); + assertThat(fn.getOutputTypeDescriptor(), equalTo(TypeDescriptors.strings())); + } + + private static String toStringThisThing(Integer i) { + return i.toString(); + } +}