Removes OldDoFn from ParDo
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e9e53c5d Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e9e53c5d Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e9e53c5d Branch: refs/heads/master Commit: e9e53c5d037561aa4dcacfcde69d76a03f3a1571 Parents: 8330bfa Author: Eugene Kirpichov <kirpic...@google.com> Authored: Fri Dec 9 17:13:43 2016 -0800 Committer: Eugene Kirpichov <kirpic...@google.com> Committed: Thu Dec 15 13:58:43 2016 -0800 ---------------------------------------------------------------------- .../org/apache/beam/sdk/transforms/ParDo.java | 167 +++---------------- .../apache/beam/sdk/transforms/OldDoFnTest.java | 125 ++++---------- 2 files changed, 55 insertions(+), 237 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e9e53c5d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java index 167f5fa..d2149c0 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java @@ -27,7 +27,6 @@ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.runners.PipelineRunner; -import org.apache.beam.sdk.transforms.OldDoFn.RequiresWindowAccess; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.display.DisplayData.Builder; import org.apache.beam.sdk.transforms.display.HasDisplayData; @@ -530,23 +529,6 @@ public class ParDo { return new Unbound().of(fn, displayDataForFn(fn)); } - /** - * Creates a {@link ParDo} {@link PTransform} that will invoke the - * given {@link OldDoFn} function. - * - * <p>The resulting {@link PTransform PTransform's} types have been bound, with the - * input being a {@code PCollection<InputT>} and the output a - * {@code PCollection<OutputT>}, inferred from the types of the argument - * {@code OldDoFn<InputT, OutputT>}. It is ready to be applied, or further - * properties can be set on it first. - * - * @deprecated please port your {@link OldDoFn} to a {@link DoFn} - */ - @Deprecated - public static <InputT, OutputT> Bound<InputT, OutputT> of(OldDoFn<InputT, OutputT> fn) { - return new Unbound().of(fn, displayDataForFn(fn)); - } - private static <T> DisplayData.ItemSpec<? extends Class<?>> displayDataForFn(T fn) { return DisplayData.item("fn", fn.getClass()).withLabel("Transform Function"); } @@ -557,12 +539,7 @@ public class ParDo { * the {@link PCollection}. */ private static <InputT, OutputT> void validateWindowType( - PCollection<? extends InputT> input, Serializable fn) { - // No validation for OldDoFn - if (!(fn instanceof DoFn)) { - return; - } - + PCollection<? extends InputT> input, DoFn<InputT, OutputT> fn) { DoFnSignature signature = DoFnSignatures.getSignature((Class) fn.getClass()); TypeDescriptor<? extends BoundedWindow> actualWindowT = @@ -609,10 +586,6 @@ public class ParDo { } } - private static <InputT, OutputT> OldDoFn<InputT, OutputT> adapt(DoFn<InputT, OutputT> fn) { - return DoFnAdapters.toOldDoFn(fn); - } - /** * An incomplete {@link ParDo} transform, with unbound input/output types. * @@ -688,24 +661,9 @@ public class ParDo { return new UnboundMulti<>(name, sideInputs, mainOutputTag, sideOutputTags); } - /** - * Returns a new {@link ParDo} {@link PTransform} that's like this - * transform but that will invoke the given {@link OldDoFn} - * function, and that has its input and output types bound. Does - * not modify this transform. The resulting {@link PTransform} is - * sufficiently specified to be applied, but more properties can - * still be specified. - * - * @deprecated please port your {@link OldDoFn} to a {@link DoFn} - */ - @Deprecated - public <InputT, OutputT> Bound<InputT, OutputT> of(OldDoFn<InputT, OutputT> oldFn) { - return of(oldFn, displayDataForFn(oldFn)); - } - private <InputT, OutputT> Bound<InputT, OutputT> of( - Serializable originalFn, DisplayData.ItemSpec<? extends Class<?>> fnDisplayData) { - return new Bound<>(name, originalFn, sideInputs, fnDisplayData); + DoFn<InputT, OutputT> doFn, DisplayData.ItemSpec<? extends Class<?>> fnDisplayData) { + return new Bound<>(name, doFn, sideInputs, fnDisplayData); } } @@ -725,12 +683,12 @@ public class ParDo { extends PTransform<PCollection<? extends InputT>, PCollection<OutputT>> { // Inherits name. private final List<PCollectionView<?>> sideInputs; - private final Serializable fn; + private final DoFn<InputT, OutputT> fn; private final DisplayData.ItemSpec<? extends Class<?>> fnDisplayData; Bound( String name, - Serializable fn, + DoFn<InputT, OutputT> fn, List<PCollectionView<?>> sideInputs, DisplayData.ItemSpec<? extends Class<?>> fnDisplayData) { super(name); @@ -787,7 +745,7 @@ public class ParDo { @Override public PCollection<OutputT> expand(PCollection<? extends InputT> input) { checkArgument( - !isSplittable(getOldFn()), + !isSplittable(getNewFn()), "%s does not support Splittable DoFn", input.getPipeline().getOptions().getRunner().getName()); validateWindowType(input, fn); @@ -795,7 +753,7 @@ public class ParDo { input.getPipeline(), input.getWindowingStrategy(), input.isBounded()) - .setTypeDescriptor(getOldFn().getOutputTypeDescriptor()); + .setTypeDescriptor(getNewFn().getOutputTypeDescriptor()); } @Override @@ -803,14 +761,14 @@ public class ParDo { protected Coder<OutputT> getDefaultOutputCoder(PCollection<? extends InputT> input) throws CannotProvideCoderException { return input.getPipeline().getCoderRegistry().getDefaultCoder( - getOldFn().getOutputTypeDescriptor(), - getOldFn().getInputTypeDescriptor(), + getNewFn().getOutputTypeDescriptor(), + getNewFn().getInputTypeDescriptor(), ((PCollection<InputT>) input).getCoder()); } @Override protected String getKindString() { - Class<?> clazz = DoFnAdapters.getDoFnClass(getOldFn()); + Class<?> clazz = getNewFn().getClass(); if (clazz.isAnonymousClass()) { return "AnonymousParDo"; } else { @@ -831,44 +789,7 @@ public class ParDo { ParDo.populateDisplayData(builder, (HasDisplayData) fn, fnDisplayData); } - /** - * @deprecated this method to be converted to return {@link DoFn}. If you want to receive - * an {@link OldDoFn} you should (temporarily) use {@link #getOldFn}. - */ - @Deprecated - public OldDoFn<InputT, OutputT> getFn() { - return getOldFn(); - } - - /** - * @deprecated please migrate to {@link #getNewFn} until {@link #getFn} is migrated to return - * a {@link DoFn}. - */ - @Deprecated - public OldDoFn<InputT, OutputT> getOldFn() { - if (fn instanceof OldDoFn) { - return (OldDoFn<InputT, OutputT>) fn; - } else { - return adapt((DoFn<InputT, OutputT>) fn); - } - } - public DoFn<InputT, OutputT> getNewFn() { - if (fn instanceof DoFn) { - return (DoFn<InputT, OutputT>) fn; - } else { - return ((OldDoFn<InputT, OutputT>) fn).toDoFn(); - } - } - - /** - * Returns the {@link OldDoFn} or {@link DoFn} used to create this transform. - * - * @deprecated for migration purposes only. There are some cases of {@link OldDoFn} that are not - * fully supported by wrapping it into a {@link DoFn}, such as {@link RequiresWindowAccess}. - */ - @Deprecated - public Object getOriginalFn() { return fn; } @@ -951,23 +872,8 @@ public class ParDo { return of(fn, displayDataForFn(fn)); } - /** - * Returns a new multi-output {@link ParDo} {@link PTransform} - * that's like this transform but that will invoke the given - * {@link OldDoFn} function, and that has its input type bound. - * Does not modify this transform. The resulting - * {@link PTransform} is sufficiently specified to be applied, but - * more properties can still be specified. - * - * @deprecated please port your {@link OldDoFn} to a {@link DoFn} - */ - @Deprecated - public <InputT> BoundMulti<InputT, OutputT> of(OldDoFn<InputT, OutputT> fn) { - return of(fn, displayDataForFn(fn)); - } - private <InputT> BoundMulti<InputT, OutputT> of( - Serializable fn, DisplayData.ItemSpec<? extends Class<?>> fnDisplayData) { + DoFn<InputT, OutputT> fn, DisplayData.ItemSpec<? extends Class<?>> fnDisplayData) { return new BoundMulti<>(name, fn, sideInputs, mainOutputTag, sideOutputTags, fnDisplayData); } } @@ -990,11 +896,11 @@ public class ParDo { private final TupleTag<OutputT> mainOutputTag; private final TupleTagList sideOutputTags; private final DisplayData.ItemSpec<? extends Class<?>> fnDisplayData; - private final Serializable fn; + private final DoFn<InputT, OutputT> fn; BoundMulti( String name, - Serializable fn, + DoFn<InputT, OutputT> fn, List<PCollectionView<?>> sideInputs, TupleTag<OutputT> mainOutputTag, TupleTagList sideOutputTags, @@ -1046,7 +952,7 @@ public class ParDo { @Override public PCollectionTuple expand(PCollection<? extends InputT> input) { checkArgument( - !isSplittable(getOldFn()), + !isSplittable(getNewFn()), "%s does not support Splittable DoFn", input.getPipeline().getOptions().getRunner().getName()); validateWindowType(input, fn); @@ -1059,7 +965,7 @@ public class ParDo { // The fn will likely be an instance of an anonymous subclass // such as DoFn<Integer, String> { }, thus will have a high-fidelity // TypeDescriptor for the output type. - outputs.get(mainOutputTag).setTypeDescriptor(getOldFn().getOutputTypeDescriptor()); + outputs.get(mainOutputTag).setTypeDescriptor(getNewFn().getOutputTypeDescriptor()); return outputs; } @@ -1084,7 +990,7 @@ public class ParDo { @Override protected String getKindString() { - Class<?> clazz = DoFnAdapters.getDoFnClass(getOldFn()); + Class<?> clazz = getNewFn().getClass(); if (clazz.isAnonymousClass()) { return "AnonymousParMultiDo"; } else { @@ -1095,37 +1001,11 @@ public class ParDo { @Override public void populateDisplayData(Builder builder) { super.populateDisplayData(builder); - ParDo.populateDisplayData(builder, (HasDisplayData) fn, fnDisplayData); - } - - /** - * @deprecated this method to be converted to return {@link DoFn}. If you want to receive - * an {@link OldDoFn} you should (temporarily) use {@link #getOldFn}. - */ - @Deprecated - public OldDoFn<InputT, OutputT> getFn() { - return getOldFn(); - } - - /** - * @deprecated please migrate to {@link #getNewFn} until {@link #getFn} is migrated to return - * a {@link DoFn}. - */ - @Deprecated - public OldDoFn<InputT, OutputT> getOldFn() { - if (fn instanceof OldDoFn) { - return (OldDoFn<InputT, OutputT>) fn; - } else { - return adapt((DoFn<InputT, OutputT>) fn); - } + ParDo.populateDisplayData(builder, fn, fnDisplayData); } public DoFn<InputT, OutputT> getNewFn() { - if (fn instanceof DoFn) { - return (DoFn<InputT, OutputT>) fn; - } else { - return ((OldDoFn<InputT, OutputT>) fn).toDoFn(); - } + return fn; } public TupleTag<OutputT> getMainOutputTag() { @@ -1148,14 +1028,7 @@ public class ParDo { builder.include("fn", fn).add(fnDisplayData); } - private static boolean isSplittable(OldDoFn<?, ?> oldDoFn) { - DoFn<?, ?> fn = DoFnAdapters.getDoFn(oldDoFn); - if (fn == null) { - return false; - } - return DoFnSignatures - .getSignature(fn.getClass()) - .processElement() - .isSplittable(); + private static boolean isSplittable(DoFn<?, ?> fn) { + return DoFnSignatures.signatureForDoFn(fn).processElement().isSplittable(); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e9e53c5d/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnTest.java index 07e3078..cc84252 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnTest.java @@ -18,28 +18,20 @@ package org.apache.beam.sdk.transforms; import static org.hamcrest.Matchers.empty; -import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.isA; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertThat; import java.io.Serializable; -import java.util.Map; -import org.apache.beam.sdk.AggregatorValues; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.Pipeline.PipelineExecutionException; -import org.apache.beam.sdk.PipelineResult; -import org.apache.beam.sdk.testing.NeedsRunner; -import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Combine.CombineFn; import org.apache.beam.sdk.transforms.Max.MaxIntegerFn; -import org.apache.beam.sdk.transforms.Sum.SumIntegerFn; import org.apache.beam.sdk.transforms.display.DisplayData; -import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TupleTag; +import org.joda.time.Instant; import org.junit.Rule; import org.junit.Test; -import org.junit.experimental.categories.Category; import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -134,68 +126,52 @@ public class OldDoFnTest implements Serializable { } @Test - @Category(NeedsRunner.class) - public void testCreateAggregatorInStartBundleThrows() { - TestPipeline p = createTestPipeline(new OldDoFn<String, String>() { + public void testCreateAggregatorThrowsWhenAggregatorsAreFinal() throws Exception { + OldDoFn<String, String> fn = new OldDoFn<String, String>() { @Override - public void startBundle(OldDoFn<String, String>.Context c) throws Exception { - createAggregator("anyAggregate", new MaxIntegerFn()); - } - - @Override - public void processElement(OldDoFn<String, String>.ProcessContext c) throws Exception {} - }); - - thrown.expect(PipelineExecutionException.class); - thrown.expectCause(isA(IllegalStateException.class)); + public void processElement(ProcessContext c) throws Exception { } + }; + OldDoFn<String, String>.Context context = createContext(fn); + context.setupDelegateAggregators(); - p.run(); + thrown.expect(isA(IllegalStateException.class)); + fn.createAggregator("anyAggregate", new MaxIntegerFn()); } - @Test - @Category(NeedsRunner.class) - public void testCreateAggregatorInProcessElementThrows() { - TestPipeline p = createTestPipeline(new OldDoFn<String, String>() { + private OldDoFn<String, String>.Context createContext(OldDoFn<String, String> fn) { + return fn.new Context() { @Override - public void processElement(ProcessContext c) throws Exception { - createAggregator("anyAggregate", new MaxIntegerFn()); + public PipelineOptions getPipelineOptions() { + throw new UnsupportedOperationException(); } - }); - - thrown.expect(PipelineExecutionException.class); - thrown.expectCause(isA(IllegalStateException.class)); - - p.run(); - } - @Test - @Category(NeedsRunner.class) - public void testCreateAggregatorInFinishBundleThrows() { - TestPipeline p = createTestPipeline(new OldDoFn<String, String>() { @Override - public void finishBundle(OldDoFn<String, String>.Context c) throws Exception { - createAggregator("anyAggregate", new MaxIntegerFn()); + public void output(String output) { + throw new UnsupportedOperationException(); } @Override - public void processElement(OldDoFn<String, String>.ProcessContext c) throws Exception {} - }); - - thrown.expect(PipelineExecutionException.class); - thrown.expectCause(isA(IllegalStateException.class)); + public void outputWithTimestamp(String output, Instant timestamp) { + throw new UnsupportedOperationException(); + } - p.run(); - } + @Override + public <T> void sideOutput(TupleTag<T> tag, T output) { + throw new UnsupportedOperationException(); + } - /** - * Initialize a test pipeline with the specified {@link OldDoFn}. - */ - private <InputT, OutputT> TestPipeline createTestPipeline(OldDoFn<InputT, OutputT> fn) { - TestPipeline pipeline = TestPipeline.create(); - pipeline.apply(Create.of((InputT) null)) - .apply(ParDo.of(fn)); + @Override + public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) { + throw new UnsupportedOperationException(); + } - return pipeline; + @Override + public <AggInputT, AggOutputT> + Aggregator<AggInputT, AggOutputT> createAggregatorInternal( + String name, CombineFn<AggInputT, ?, AggOutputT> combiner) { + throw new UnsupportedOperationException(); + } + }; } @Test @@ -209,35 +185,4 @@ public class OldDoFnTest implements Serializable { DisplayData data = DisplayData.from(usesDefault); assertThat(data.items(), empty()); } - - @Test - @Category(NeedsRunner.class) - public void testAggregators() throws Exception { - Pipeline pipeline = TestPipeline.create(); - - CountOddsFn countOdds = new CountOddsFn(); - PCollection<Void> output = pipeline - .apply(Create.of(1, 3, 5, 7, 2, 4, 6, 8, 10, 12, 14, 20, 42, 68, 100)) - .apply(ParDo.of(countOdds)); - PipelineResult result = pipeline.run(); - - AggregatorValues<Integer> values = result.getAggregatorValues(countOdds.aggregator); - - Map<String, Integer> valuesMap = values.getValuesAtSteps(); - - assertThat(valuesMap.size(), equalTo(1)); - assertThat(valuesMap.get(output.getProducingTransformInternal().getFullName()), equalTo(4)); - } - - private static class CountOddsFn extends OldDoFn<Integer, Void> { - @Override - public void processElement(ProcessContext c) throws Exception { - if (c.element() % 2 == 1) { - aggregator.addValue(1); - } - } - - Aggregator<Integer, Integer> aggregator = - createAggregator("odds", new SumIntegerFn()); - } }