Repository: incubator-beam Updated Branches: refs/heads/master 3e1a62815 -> 5a3ace4a7
Converts all easy OldDoFns to DoFn Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f5f329ee Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f5f329ee Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f5f329ee Branch: refs/heads/master Commit: f5f329eee4e4a446dafe15b1c42a8f0972360fbc Parents: 3e1a628 Author: Eugene Kirpichov <[email protected]> Authored: Fri Dec 9 16:17:46 2016 -0800 Committer: Eugene Kirpichov <[email protected]> Committed: Thu Dec 15 13:48:27 2016 -0800 ---------------------------------------------------------------------- .../apache/beam/runners/apex/ApexRunner.java | 11 ++---- .../FlattenPCollectionTranslatorTest.java | 15 +++----- .../translation/GroupByKeyTranslatorTest.java | 21 ++++------- .../translation/ParDoBoundTranslatorTest.java | 39 ++++++++++---------- .../translation/ReadUnboundTranslatorTest.java | 15 +++----- .../apache/beam/runners/flink/FlinkRunner.java | 10 ++--- .../beam/runners/flink/PipelineOptionsTest.java | 11 +++--- .../flink/streaming/DoFnOperatorTest.java | 19 +++++----- .../flink/streaming/GroupByNullKeyTest.java | 18 ++++----- .../streaming/TopWikipediaSessionsITCase.java | 10 ++--- 10 files changed, 75 insertions(+), 94 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f5f329ee/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java index 899efa3..e5bde46 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java @@ -22,13 +22,11 @@ import com.datatorrent.api.Context.DAGContext; import com.datatorrent.api.DAG; import com.datatorrent.api.StreamingApplication; import com.google.common.base.Throwables; - import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.concurrent.atomic.AtomicReference; - import org.apache.apex.api.EmbeddedAppLauncher; import org.apache.apex.api.Launcher; import org.apache.apex.api.Launcher.AppHandle; @@ -45,7 +43,6 @@ import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.View; @@ -245,10 +242,10 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> { } } - private static class WrapAsList<T> extends OldDoFn<T, List<T>> { - @Override + private static class WrapAsList<T> extends DoFn<T, List<T>> { + @ProcessElement public void processElement(ProcessContext c) { - c.output(Arrays.asList(c.element())); + c.output(Collections.singletonList(c.element())); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f5f329ee/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslatorTest.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslatorTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslatorTest.java index 6b62a58..f5abc34 100644 --- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslatorTest.java +++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslatorTest.java @@ -19,12 +19,11 @@ package org.apache.beam.runners.apex.translation; import com.google.common.collect.Sets; - import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Set; - import org.apache.beam.runners.apex.ApexPipelineOptions; import org.apache.beam.runners.apex.ApexRunner; import org.apache.beam.runners.apex.ApexRunnerResult; @@ -32,8 +31,8 @@ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Flatten; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionList; @@ -83,14 +82,10 @@ public class FlattenPCollectionTranslatorTest { Assert.assertEquals(expected, Sets.newHashSet(EmbeddedCollector.RESULTS)); } - @SuppressWarnings("serial") - private static class EmbeddedCollector extends OldDoFn<Object, Void> { - protected static final ArrayList<Object> RESULTS = new ArrayList<>(); - - public EmbeddedCollector() { - } + private static class EmbeddedCollector extends DoFn<Object, Void> { + private static final List<Object> RESULTS = Collections.synchronizedList(new ArrayList<>()); - @Override + @ProcessElement public void processElement(ProcessContext c) throws Exception { RESULTS.add(c.element()); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f5f329ee/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslatorTest.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslatorTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslatorTest.java index d627cd9..96963a0 100644 --- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslatorTest.java +++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslatorTest.java @@ -20,7 +20,6 @@ package org.apache.beam.runners.apex.translation; import com.google.common.collect.Lists; import com.google.common.collect.Sets; - import java.io.IOException; import java.io.Serializable; import java.util.Collections; @@ -28,9 +27,8 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.NoSuchElementException; - +import java.util.Set; import javax.annotation.Nullable; - import org.apache.beam.runners.apex.ApexPipelineOptions; import org.apache.beam.runners.apex.ApexRunner; import org.apache.beam.runners.apex.ApexRunnerResult; @@ -42,7 +40,7 @@ import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Count; -import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; @@ -106,22 +104,17 @@ public class GroupByKeyTranslatorTest { } - @SuppressWarnings("serial") - private static class EmbeddedCollector extends OldDoFn<Object, Void> { - protected static final HashSet<Object> RESULTS = new HashSet<>(); - - public EmbeddedCollector() { - } + private static class EmbeddedCollector extends DoFn<Object, Void> { + private static final Set<Object> RESULTS = Collections.synchronizedSet(new HashSet<>()); - @Override + @ProcessElement public void processElement(ProcessContext c) throws Exception { RESULTS.add(c.element()); } } - private static class KeyedByTimestamp<T> extends OldDoFn<T, KV<Instant, T>> { - - @Override + private static class KeyedByTimestamp<T> extends DoFn<T, KV<Instant, T>> { + @ProcessElement public void processElement(ProcessContext c) throws Exception { c.output(KV.of(c.timestamp(), c.element())); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f5f329ee/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslatorTest.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslatorTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslatorTest.java index 2e86152..28b2ec9 100644 --- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslatorTest.java +++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslatorTest.java @@ -26,14 +26,13 @@ import com.datatorrent.api.Sink; import com.datatorrent.lib.util.KryoCloneUtils; import com.google.common.collect.Lists; import com.google.common.collect.Sets; - import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.regex.Pattern; - import org.apache.beam.runners.apex.ApexPipelineOptions; import org.apache.beam.runners.apex.ApexRunner; import org.apache.beam.runners.apex.ApexRunnerResult; @@ -49,7 +48,8 @@ import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFnAdapters; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.transforms.View; @@ -113,8 +113,7 @@ public class ParDoBoundTranslatorTest { Assert.assertEquals(Sets.newHashSet(expected), EmbeddedCollector.RESULTS); } - @SuppressWarnings("serial") - private static class Add extends OldDoFn<Integer, Integer> { + private static class Add extends DoFn<Integer, Integer> { private Integer number; private PCollectionView<Integer> sideInputView; @@ -126,7 +125,7 @@ public class ParDoBoundTranslatorTest { this.sideInputView = sideInputView; } - @Override + @ProcessElement public void processElement(ProcessContext c) throws Exception { if (sideInputView != null) { number = c.sideInput(sideInputView); @@ -135,15 +134,14 @@ public class ParDoBoundTranslatorTest { } } - private static class EmbeddedCollector extends OldDoFn<Object, Void> { - private static final long serialVersionUID = 1L; - protected static final HashSet<Object> RESULTS = new HashSet<>(); + private static class EmbeddedCollector extends DoFn<Object, Void> { + private static final Set<Object> RESULTS = Collections.synchronizedSet(new HashSet<>()); public EmbeddedCollector() { RESULTS.clear(); } - @Override + @ProcessElement public void processElement(ProcessContext c) throws Exception { RESULTS.add(c.element()); } @@ -207,13 +205,16 @@ public class ParDoBoundTranslatorTest { PCollectionView<Integer> singletonView = pipeline.apply(Create.of(1)) .apply(Sum.integersGlobally().asSingletonView()); - ApexParDoOperator<Integer, Integer> operator = new ApexParDoOperator<>(options, - new Add(singletonView), new TupleTag<Integer>(), TupleTagList.empty().getAll(), - WindowingStrategy.globalDefault(), - Collections.<PCollectionView<?>>singletonList(singletonView), - coder, - new ApexStateInternals.ApexStateInternalsFactory<Void>() - ); + ApexParDoOperator<Integer, Integer> operator = + new ApexParDoOperator<>( + options, + DoFnAdapters.toOldDoFn(new Add(singletonView)), + new TupleTag<Integer>(), + TupleTagList.empty().getAll(), + WindowingStrategy.globalDefault(), + Collections.<PCollectionView<?>>singletonList(singletonView), + coder, + new ApexStateInternals.ApexStateInternalsFactory<Void>()); operator.setup(null); operator.beginWindow(0); WindowedValue<Integer> wv1 = WindowedValue.valueInGlobalWindow(1); @@ -303,7 +304,7 @@ public class ParDoBoundTranslatorTest { Assert.assertEquals(Sets.newHashSet(expected), EmbeddedCollector.RESULTS); } - private static class TestMultiOutputWithSideInputsFn extends OldDoFn<Integer, String> { + private static class TestMultiOutputWithSideInputsFn extends DoFn<Integer, String> { private static final long serialVersionUID = 1L; final List<PCollectionView<Integer>> sideInputViews = new ArrayList<>(); @@ -315,7 +316,7 @@ public class ParDoBoundTranslatorTest { this.sideOutputTupleTags.addAll(sideOutputTupleTags); } - @Override + @ProcessElement public void processElement(ProcessContext c) throws Exception { outputToAllWithSideInputs(c, "processing: " + c.element()); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f5f329ee/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ReadUnboundTranslatorTest.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ReadUnboundTranslatorTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ReadUnboundTranslatorTest.java index 96ba663..8e44bab 100644 --- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ReadUnboundTranslatorTest.java +++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ReadUnboundTranslatorTest.java @@ -24,11 +24,10 @@ import com.google.common.collect.DiscreteDomain; import com.google.common.collect.Lists; import com.google.common.collect.Range; import com.google.common.collect.Sets; - +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; - import org.apache.beam.runners.apex.ApexPipelineOptions; import org.apache.beam.runners.apex.ApexRunner; import org.apache.beam.runners.apex.ApexRunnerResult; @@ -39,7 +38,7 @@ import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.CountingSource; import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; import org.junit.Assert; import org.junit.Test; @@ -113,14 +112,10 @@ public class ReadUnboundTranslatorTest { Assert.assertEquals(Sets.newHashSet(expected), EmbeddedCollector.RESULTS); } - @SuppressWarnings("serial") - private static class EmbeddedCollector extends OldDoFn<Object, Void> { - protected static final HashSet<Object> RESULTS = new HashSet<>(); - - public EmbeddedCollector() { - } + private static class EmbeddedCollector extends DoFn<Object, Void> { + private static final Set<Object> RESULTS = Collections.synchronizedSet(new HashSet<>()); - @Override + @ProcessElement public void processElement(ProcessContext c) throws Exception { RESULTS.add(c.element()); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f5f329ee/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java index 7c1284b..5f92378 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java @@ -24,7 +24,7 @@ import java.net.URISyntaxException; import java.net.URL; import java.net.URLClassLoader; import java.util.ArrayList; -import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -42,7 +42,7 @@ import org.apache.beam.sdk.options.PipelineOptionsValidator; import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.runners.TransformHierarchy; import org.apache.beam.sdk.transforms.Combine; -import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.View; @@ -440,10 +440,10 @@ public class FlinkRunner extends PipelineRunner<PipelineResult> { } } - private static class WrapAsList<T> extends OldDoFn<T, List<T>> { - @Override + private static class WrapAsList<T> extends DoFn<T, List<T>> { + @ProcessElement public void processElement(ProcessContext c) { - c.output(Arrays.asList(c.element())); + c.output(Collections.singletonList(c.element())); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f5f329ee/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java index 3c30fed..e44a705 100644 --- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java +++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java @@ -29,7 +29,8 @@ import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFnAdapters; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.WindowedValue; @@ -98,7 +99,7 @@ public class PipelineOptionsTest { @Test(expected = Exception.class) public void parDoBaseClassPipelineOptionsNullTest() { DoFnOperator<Object, Object, Object> doFnOperator = new DoFnOperator<>( - new TestDoFn(), + DoFnAdapters.toOldDoFn(new TestDoFn()), TypeInformation.of(new TypeHint<WindowedValue<Object>>() {}), new TupleTag<>("main-output"), Collections.<TupleTag<?>>emptyList(), @@ -117,7 +118,7 @@ public class PipelineOptionsTest { public void parDoBaseClassPipelineOptionsSerializationTest() throws Exception { DoFnOperator<Object, Object, Object> doFnOperator = new DoFnOperator<>( - new TestDoFn(), + DoFnAdapters.toOldDoFn(new TestDoFn()), TypeInformation.of(new TypeHint<WindowedValue<Object>>() {}), new TupleTag<>("main-output"), Collections.<TupleTag<?>>emptyList(), @@ -151,9 +152,9 @@ public class PipelineOptionsTest { } - private static class TestDoFn extends OldDoFn<Object, Object> { + private static class TestDoFn extends DoFn<Object, Object> { - @Override + @ProcessElement public void processElement(ProcessContext c) throws Exception { Assert.assertNotNull(c.getPipelineOptions()); Assert.assertEquals( http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f5f329ee/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java index 913fb8b..65e244a 100644 --- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java +++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java @@ -25,7 +25,6 @@ import com.google.common.base.Predicate; import com.google.common.collect.FluentIterable; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; - import java.util.Collections; import java.util.HashMap; import javax.annotation.Nullable; @@ -35,6 +34,8 @@ import org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.PCollectionViewTesting; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFnAdapters; import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.join.RawUnionValue; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; @@ -96,7 +97,7 @@ public class DoFnOperatorTest { TupleTag<String> outputTag = new TupleTag<>("main-output"); DoFnOperator<String, String, String> doFnOperator = new DoFnOperator<>( - new IdentityDoFn<String>(), + DoFnAdapters.toOldDoFn(new IdentityDoFn<String>()), coderTypeInfo, outputTag, Collections.<TupleTag<?>>emptyList(), @@ -140,7 +141,7 @@ public class DoFnOperatorTest { .build(); DoFnOperator<String, String, RawUnionValue> doFnOperator = new DoFnOperator<>( - new MultiOutputDoFn(sideOutput1, sideOutput2), + DoFnAdapters.toOldDoFn(new MultiOutputDoFn(sideOutput1, sideOutput2)), coderTypeInfo, mainOutput, ImmutableList.<TupleTag<?>>of(sideOutput1, sideOutput2), @@ -200,7 +201,7 @@ public class DoFnOperatorTest { .build(); DoFnOperator<String, String, String> doFnOperator = new DoFnOperator<>( - new IdentityDoFn<String>(), + DoFnAdapters.toOldDoFn(new IdentityDoFn<String>()), coderTypeInfo, outputTag, Collections.<TupleTag<?>>emptyList(), @@ -280,7 +281,7 @@ public class DoFnOperatorTest { }); } - private static class MultiOutputDoFn extends OldDoFn<String, String> { + private static class MultiOutputDoFn extends DoFn<String, String> { private TupleTag<String> sideOutput1; private TupleTag<String> sideOutput2; @@ -289,7 +290,7 @@ public class DoFnOperatorTest { this.sideOutput2 = sideOutput2; } - @Override + @ProcessElement public void processElement(ProcessContext c) throws Exception { if (c.element().equals("one")) { c.sideOutput(sideOutput1, "side: one"); @@ -303,9 +304,9 @@ public class DoFnOperatorTest { } } - private static class IdentityDoFn<T> extends OldDoFn<T, T> { - @Override - public void processElement(OldDoFn<T, T>.ProcessContext c) throws Exception { + private static class IdentityDoFn<T> extends DoFn<T, T> { + @ProcessElement + public void processElement(ProcessContext c) throws Exception { c.output(c.element()); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f5f329ee/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java index c6381ee..663b910 100644 --- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java +++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java @@ -24,8 +24,8 @@ import org.apache.beam.runners.flink.FlinkTestPipeline; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.GroupByKey; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.windowing.AfterWatermark; import org.apache.beam.sdk.transforms.windowing.FixedWindows; @@ -64,10 +64,8 @@ public class GroupByNullKeyTest extends StreamingProgramTestBase implements Seri /** * DoFn extracting user and timestamp. */ - public static class ExtractUserAndTimestamp extends OldDoFn<KV<Integer, String>, String> { - private static final long serialVersionUID = 0; - - @Override + private static class ExtractUserAndTimestamp extends DoFn<KV<Integer, String>, String> { + @ProcessElement public void processElement(ProcessContext c) { KV<Integer, String> record = c.element(); int timestamp = record.getKey(); @@ -100,16 +98,16 @@ public class GroupByNullKeyTest extends StreamingProgramTestBase implements Seri .withAllowedLateness(Duration.ZERO) .discardingFiredPanes()) - .apply(ParDo.of(new OldDoFn<String, KV<Void, String>>() { - @Override + .apply(ParDo.of(new DoFn<String, KV<Void, String>>() { + @ProcessElement public void processElement(ProcessContext c) throws Exception { String elem = c.element(); - c.output(KV.<Void, String>of((Void) null, elem)); + c.output(KV.<Void, String>of(null, elem)); } })) .apply(GroupByKey.<Void, String>create()) - .apply(ParDo.of(new OldDoFn<KV<Void, Iterable<String>>, String>() { - @Override + .apply(ParDo.of(new DoFn<KV<Void, Iterable<String>>, String>() { + @ProcessElement public void processElement(ProcessContext c) throws Exception { KV<Void, Iterable<String>> elem = c.element(); StringBuilder str = new StringBuilder(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f5f329ee/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsITCase.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsITCase.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsITCase.java index 9410481..9e6bba8 100644 --- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsITCase.java +++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsITCase.java @@ -26,7 +26,7 @@ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.windowing.Sessions; import org.apache.beam.sdk.transforms.windowing.Window; @@ -100,8 +100,8 @@ public class TopWikipediaSessionsITCase extends StreamingProgramTestBase impleme - .apply(ParDo.of(new OldDoFn<TableRow, String>() { - @Override + .apply(ParDo.of(new DoFn<TableRow, String>() { + @ProcessElement public void processElement(ProcessContext c) throws Exception { TableRow row = c.element(); long timestamp = (Integer) row.get("timestamp"); @@ -117,8 +117,8 @@ public class TopWikipediaSessionsITCase extends StreamingProgramTestBase impleme .apply(Count.<String>perElement()); - PCollection<String> format = output.apply(ParDo.of(new OldDoFn<KV<String, Long>, String>() { - @Override + PCollection<String> format = output.apply(ParDo.of(new DoFn<KV<String, Long>, String>() { + @ProcessElement public void processElement(ProcessContext c) throws Exception { KV<String, Long> el = c.element(); String out = "user: " + el.getKey() + " value:" + el.getValue();
