Repository: incubator-beam
Updated Branches:
  refs/heads/master 3e1a62815 -> 5a3ace4a7


Converts all easy OldDoFns to DoFn


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f5f329ee
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f5f329ee
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f5f329ee

Branch: refs/heads/master
Commit: f5f329eee4e4a446dafe15b1c42a8f0972360fbc
Parents: 3e1a628
Author: Eugene Kirpichov <[email protected]>
Authored: Fri Dec 9 16:17:46 2016 -0800
Committer: Eugene Kirpichov <[email protected]>
Committed: Thu Dec 15 13:48:27 2016 -0800

----------------------------------------------------------------------
 .../apache/beam/runners/apex/ApexRunner.java    | 11 ++----
 .../FlattenPCollectionTranslatorTest.java       | 15 +++-----
 .../translation/GroupByKeyTranslatorTest.java   | 21 ++++-------
 .../translation/ParDoBoundTranslatorTest.java   | 39 ++++++++++----------
 .../translation/ReadUnboundTranslatorTest.java  | 15 +++-----
 .../apache/beam/runners/flink/FlinkRunner.java  | 10 ++---
 .../beam/runners/flink/PipelineOptionsTest.java | 11 +++---
 .../flink/streaming/DoFnOperatorTest.java       | 19 +++++-----
 .../flink/streaming/GroupByNullKeyTest.java     | 18 ++++-----
 .../streaming/TopWikipediaSessionsITCase.java   | 10 ++---
 10 files changed, 75 insertions(+), 94 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f5f329ee/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
index 899efa3..e5bde46 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
@@ -22,13 +22,11 @@ import com.datatorrent.api.Context.DAGContext;
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.StreamingApplication;
 import com.google.common.base.Throwables;
-
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicReference;
-
 import org.apache.apex.api.EmbeddedAppLauncher;
 import org.apache.apex.api.Launcher;
 import org.apache.apex.api.Launcher.AppHandle;
@@ -45,7 +43,6 @@ import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.View;
@@ -245,10 +242,10 @@ public class ApexRunner extends 
PipelineRunner<ApexRunnerResult> {
     }
   }
 
-  private static class WrapAsList<T> extends OldDoFn<T, List<T>> {
-    @Override
+  private static class WrapAsList<T> extends DoFn<T, List<T>> {
+    @ProcessElement
     public void processElement(ProcessContext c) {
-      c.output(Arrays.asList(c.element()));
+      c.output(Collections.singletonList(c.element()));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f5f329ee/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslatorTest.java
----------------------------------------------------------------------
diff --git 
a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslatorTest.java
 
b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslatorTest.java
index 6b62a58..f5abc34 100644
--- 
a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslatorTest.java
+++ 
b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslatorTest.java
@@ -19,12 +19,11 @@
 package org.apache.beam.runners.apex.translation;
 
 import com.google.common.collect.Sets;
-
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Set;
-
 import org.apache.beam.runners.apex.ApexPipelineOptions;
 import org.apache.beam.runners.apex.ApexRunner;
 import org.apache.beam.runners.apex.ApexRunnerResult;
@@ -32,8 +31,8 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Flatten;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionList;
@@ -83,14 +82,10 @@ public class FlattenPCollectionTranslatorTest {
     Assert.assertEquals(expected, Sets.newHashSet(EmbeddedCollector.RESULTS));
   }
 
-  @SuppressWarnings("serial")
-  private static class EmbeddedCollector extends OldDoFn<Object, Void> {
-    protected static final ArrayList<Object> RESULTS = new ArrayList<>();
-
-    public EmbeddedCollector() {
-    }
+  private static class EmbeddedCollector extends DoFn<Object, Void> {
+    private static final List<Object> RESULTS = 
Collections.synchronizedList(new ArrayList<>());
 
-    @Override
+    @ProcessElement
     public void processElement(ProcessContext c) throws Exception {
       RESULTS.add(c.element());
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f5f329ee/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslatorTest.java
----------------------------------------------------------------------
diff --git 
a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslatorTest.java
 
b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslatorTest.java
index d627cd9..96963a0 100644
--- 
a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslatorTest.java
+++ 
b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslatorTest.java
@@ -20,7 +20,6 @@ package org.apache.beam.runners.apex.translation;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
-
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.Collections;
@@ -28,9 +27,8 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.NoSuchElementException;
-
+import java.util.Set;
 import javax.annotation.Nullable;
-
 import org.apache.beam.runners.apex.ApexPipelineOptions;
 import org.apache.beam.runners.apex.ApexRunner;
 import org.apache.beam.runners.apex.ApexRunnerResult;
@@ -42,7 +40,7 @@ import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
@@ -106,22 +104,17 @@ public class GroupByKeyTranslatorTest {
 
   }
 
-  @SuppressWarnings("serial")
-  private static class EmbeddedCollector extends OldDoFn<Object, Void> {
-    protected static final HashSet<Object> RESULTS = new HashSet<>();
-
-    public EmbeddedCollector() {
-    }
+  private static class EmbeddedCollector extends DoFn<Object, Void> {
+    private static final Set<Object> RESULTS = Collections.synchronizedSet(new 
HashSet<>());
 
-    @Override
+    @ProcessElement
     public void processElement(ProcessContext c) throws Exception {
       RESULTS.add(c.element());
     }
   }
 
-  private static class KeyedByTimestamp<T> extends OldDoFn<T, KV<Instant, T>> {
-
-    @Override
+  private static class KeyedByTimestamp<T> extends DoFn<T, KV<Instant, T>> {
+    @ProcessElement
     public void processElement(ProcessContext c) throws Exception {
       c.output(KV.of(c.timestamp(), c.element()));
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f5f329ee/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslatorTest.java
----------------------------------------------------------------------
diff --git 
a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslatorTest.java
 
b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslatorTest.java
index 2e86152..28b2ec9 100644
--- 
a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslatorTest.java
+++ 
b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslatorTest.java
@@ -26,14 +26,13 @@ import com.datatorrent.api.Sink;
 import com.datatorrent.lib.util.KryoCloneUtils;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
-
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 import java.util.regex.Pattern;
-
 import org.apache.beam.runners.apex.ApexPipelineOptions;
 import org.apache.beam.runners.apex.ApexRunner;
 import org.apache.beam.runners.apex.ApexRunnerResult;
@@ -49,7 +48,8 @@ import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFnAdapters;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.View;
@@ -113,8 +113,7 @@ public class ParDoBoundTranslatorTest {
     Assert.assertEquals(Sets.newHashSet(expected), EmbeddedCollector.RESULTS);
   }
 
-  @SuppressWarnings("serial")
-  private static class Add extends OldDoFn<Integer, Integer> {
+  private static class Add extends DoFn<Integer, Integer> {
     private Integer number;
     private PCollectionView<Integer> sideInputView;
 
@@ -126,7 +125,7 @@ public class ParDoBoundTranslatorTest {
       this.sideInputView = sideInputView;
     }
 
-    @Override
+    @ProcessElement
     public void processElement(ProcessContext c) throws Exception {
       if (sideInputView != null) {
         number = c.sideInput(sideInputView);
@@ -135,15 +134,14 @@ public class ParDoBoundTranslatorTest {
     }
   }
 
-  private static class EmbeddedCollector extends OldDoFn<Object, Void> {
-    private static final long serialVersionUID = 1L;
-    protected static final HashSet<Object> RESULTS = new HashSet<>();
+  private static class EmbeddedCollector extends DoFn<Object, Void> {
+    private static final Set<Object> RESULTS = Collections.synchronizedSet(new 
HashSet<>());
 
     public EmbeddedCollector() {
       RESULTS.clear();
     }
 
-    @Override
+    @ProcessElement
     public void processElement(ProcessContext c) throws Exception {
       RESULTS.add(c.element());
     }
@@ -207,13 +205,16 @@ public class ParDoBoundTranslatorTest {
     PCollectionView<Integer> singletonView = pipeline.apply(Create.of(1))
             .apply(Sum.integersGlobally().asSingletonView());
 
-    ApexParDoOperator<Integer, Integer> operator = new 
ApexParDoOperator<>(options,
-        new Add(singletonView), new TupleTag<Integer>(), 
TupleTagList.empty().getAll(),
-        WindowingStrategy.globalDefault(),
-        Collections.<PCollectionView<?>>singletonList(singletonView),
-        coder,
-        new ApexStateInternals.ApexStateInternalsFactory<Void>()
-        );
+    ApexParDoOperator<Integer, Integer> operator =
+        new ApexParDoOperator<>(
+            options,
+            DoFnAdapters.toOldDoFn(new Add(singletonView)),
+            new TupleTag<Integer>(),
+            TupleTagList.empty().getAll(),
+            WindowingStrategy.globalDefault(),
+            Collections.<PCollectionView<?>>singletonList(singletonView),
+            coder,
+            new ApexStateInternals.ApexStateInternalsFactory<Void>());
     operator.setup(null);
     operator.beginWindow(0);
     WindowedValue<Integer> wv1 = WindowedValue.valueInGlobalWindow(1);
@@ -303,7 +304,7 @@ public class ParDoBoundTranslatorTest {
      Assert.assertEquals(Sets.newHashSet(expected), EmbeddedCollector.RESULTS);
   }
 
-  private static class TestMultiOutputWithSideInputsFn extends 
OldDoFn<Integer, String> {
+  private static class TestMultiOutputWithSideInputsFn extends DoFn<Integer, 
String> {
     private static final long serialVersionUID = 1L;
 
     final List<PCollectionView<Integer>> sideInputViews = new ArrayList<>();
@@ -315,7 +316,7 @@ public class ParDoBoundTranslatorTest {
       this.sideOutputTupleTags.addAll(sideOutputTupleTags);
     }
 
-    @Override
+    @ProcessElement
     public void processElement(ProcessContext c) throws Exception {
       outputToAllWithSideInputs(c, "processing: " + c.element());
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f5f329ee/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ReadUnboundTranslatorTest.java
----------------------------------------------------------------------
diff --git 
a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ReadUnboundTranslatorTest.java
 
b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ReadUnboundTranslatorTest.java
index 96ba663..8e44bab 100644
--- 
a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ReadUnboundTranslatorTest.java
+++ 
b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ReadUnboundTranslatorTest.java
@@ -24,11 +24,10 @@ import com.google.common.collect.DiscreteDomain;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Range;
 import com.google.common.collect.Sets;
-
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
-
 import org.apache.beam.runners.apex.ApexPipelineOptions;
 import org.apache.beam.runners.apex.ApexRunner;
 import org.apache.beam.runners.apex.ApexRunnerResult;
@@ -39,7 +38,7 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.CountingSource;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.junit.Assert;
 import org.junit.Test;
@@ -113,14 +112,10 @@ public class ReadUnboundTranslatorTest {
     Assert.assertEquals(Sets.newHashSet(expected), EmbeddedCollector.RESULTS);
   }
 
-  @SuppressWarnings("serial")
-  private static class EmbeddedCollector extends OldDoFn<Object, Void> {
-    protected static final HashSet<Object> RESULTS = new HashSet<>();
-
-    public EmbeddedCollector() {
-    }
+  private static class EmbeddedCollector extends DoFn<Object, Void> {
+    private static final Set<Object> RESULTS = Collections.synchronizedSet(new 
HashSet<>());
 
-    @Override
+    @ProcessElement
     public void processElement(ProcessContext c) throws Exception {
       RESULTS.add(c.element());
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f5f329ee/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
index 7c1284b..5f92378 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
@@ -24,7 +24,7 @@ import java.net.URISyntaxException;
 import java.net.URL;
 import java.net.URLClassLoader;
 import java.util.ArrayList;
-import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -42,7 +42,7 @@ import org.apache.beam.sdk.options.PipelineOptionsValidator;
 import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.runners.TransformHierarchy;
 import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.View;
@@ -440,10 +440,10 @@ public class FlinkRunner extends 
PipelineRunner<PipelineResult> {
     }
   }
 
-  private static class WrapAsList<T> extends OldDoFn<T, List<T>> {
-    @Override
+  private static class WrapAsList<T> extends DoFn<T, List<T>> {
+    @ProcessElement
     public void processElement(ProcessContext c) {
-      c.output(Arrays.asList(c.element()));
+      c.output(Collections.singletonList(c.element()));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f5f329ee/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
 
b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
index 3c30fed..e44a705 100644
--- 
a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
+++ 
b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
@@ -29,7 +29,8 @@ import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFnAdapters;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -98,7 +99,7 @@ public class PipelineOptionsTest {
   @Test(expected = Exception.class)
   public void parDoBaseClassPipelineOptionsNullTest() {
     DoFnOperator<Object, Object, Object> doFnOperator = new DoFnOperator<>(
-        new TestDoFn(),
+        DoFnAdapters.toOldDoFn(new TestDoFn()),
         TypeInformation.of(new TypeHint<WindowedValue<Object>>() {}),
         new TupleTag<>("main-output"),
         Collections.<TupleTag<?>>emptyList(),
@@ -117,7 +118,7 @@ public class PipelineOptionsTest {
   public void parDoBaseClassPipelineOptionsSerializationTest() throws 
Exception {
 
     DoFnOperator<Object, Object, Object> doFnOperator = new DoFnOperator<>(
-        new TestDoFn(),
+        DoFnAdapters.toOldDoFn(new TestDoFn()),
         TypeInformation.of(new TypeHint<WindowedValue<Object>>() {}),
         new TupleTag<>("main-output"),
         Collections.<TupleTag<?>>emptyList(),
@@ -151,9 +152,9 @@ public class PipelineOptionsTest {
   }
 
 
-  private static class TestDoFn extends OldDoFn<Object, Object> {
+  private static class TestDoFn extends DoFn<Object, Object> {
 
-    @Override
+    @ProcessElement
     public void processElement(ProcessContext c) throws Exception {
       Assert.assertNotNull(c.getPipelineOptions());
       Assert.assertEquals(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f5f329ee/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
 
b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
index 913fb8b..65e244a 100644
--- 
a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
+++ 
b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
@@ -25,7 +25,6 @@ import com.google.common.base.Predicate;
 import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
-
 import java.util.Collections;
 import java.util.HashMap;
 import javax.annotation.Nullable;
@@ -35,6 +34,8 @@ import 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.PCollectionViewTesting;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFnAdapters;
 import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.join.RawUnionValue;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -96,7 +97,7 @@ public class DoFnOperatorTest {
     TupleTag<String> outputTag = new TupleTag<>("main-output");
 
     DoFnOperator<String, String, String> doFnOperator = new DoFnOperator<>(
-        new IdentityDoFn<String>(),
+        DoFnAdapters.toOldDoFn(new IdentityDoFn<String>()),
         coderTypeInfo,
         outputTag,
         Collections.<TupleTag<?>>emptyList(),
@@ -140,7 +141,7 @@ public class DoFnOperatorTest {
         .build();
 
     DoFnOperator<String, String, RawUnionValue> doFnOperator = new 
DoFnOperator<>(
-        new MultiOutputDoFn(sideOutput1, sideOutput2),
+        DoFnAdapters.toOldDoFn(new MultiOutputDoFn(sideOutput1, sideOutput2)),
         coderTypeInfo,
         mainOutput,
         ImmutableList.<TupleTag<?>>of(sideOutput1, sideOutput2),
@@ -200,7 +201,7 @@ public class DoFnOperatorTest {
             .build();
 
     DoFnOperator<String, String, String> doFnOperator = new DoFnOperator<>(
-        new IdentityDoFn<String>(),
+        DoFnAdapters.toOldDoFn(new IdentityDoFn<String>()),
         coderTypeInfo,
         outputTag,
         Collections.<TupleTag<?>>emptyList(),
@@ -280,7 +281,7 @@ public class DoFnOperatorTest {
     });
   }
 
-  private static class MultiOutputDoFn extends OldDoFn<String, String> {
+  private static class MultiOutputDoFn extends DoFn<String, String> {
     private TupleTag<String> sideOutput1;
     private TupleTag<String> sideOutput2;
 
@@ -289,7 +290,7 @@ public class DoFnOperatorTest {
       this.sideOutput2 = sideOutput2;
     }
 
-    @Override
+    @ProcessElement
     public void processElement(ProcessContext c) throws Exception {
       if (c.element().equals("one")) {
         c.sideOutput(sideOutput1, "side: one");
@@ -303,9 +304,9 @@ public class DoFnOperatorTest {
     }
   }
 
-  private static class IdentityDoFn<T> extends OldDoFn<T, T> {
-    @Override
-    public void processElement(OldDoFn<T, T>.ProcessContext c) throws 
Exception {
+  private static class IdentityDoFn<T> extends DoFn<T, T> {
+    @ProcessElement
+    public void processElement(ProcessContext c) throws Exception {
       c.output(c.element());
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f5f329ee/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java
 
b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java
index c6381ee..663b910 100644
--- 
a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java
+++ 
b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java
@@ -24,8 +24,8 @@ import org.apache.beam.runners.flink.FlinkTestPipeline;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
@@ -64,10 +64,8 @@ public class GroupByNullKeyTest extends 
StreamingProgramTestBase implements Seri
   /**
    * DoFn extracting user and timestamp.
    */
-  public static class ExtractUserAndTimestamp extends OldDoFn<KV<Integer, 
String>, String> {
-    private static final long serialVersionUID = 0;
-
-    @Override
+  private static class ExtractUserAndTimestamp extends DoFn<KV<Integer, 
String>, String> {
+    @ProcessElement
     public void processElement(ProcessContext c) {
       KV<Integer, String> record = c.element();
       int timestamp = record.getKey();
@@ -100,16 +98,16 @@ public class GroupByNullKeyTest extends 
StreamingProgramTestBase implements Seri
               .withAllowedLateness(Duration.ZERO)
               .discardingFiredPanes())
 
-          .apply(ParDo.of(new OldDoFn<String, KV<Void, String>>() {
-            @Override
+          .apply(ParDo.of(new DoFn<String, KV<Void, String>>() {
+            @ProcessElement
             public void processElement(ProcessContext c) throws Exception {
               String elem = c.element();
-              c.output(KV.<Void, String>of((Void) null, elem));
+              c.output(KV.<Void, String>of(null, elem));
             }
           }))
           .apply(GroupByKey.<Void, String>create())
-          .apply(ParDo.of(new OldDoFn<KV<Void, Iterable<String>>, String>() {
-            @Override
+          .apply(ParDo.of(new DoFn<KV<Void, Iterable<String>>, String>() {
+            @ProcessElement
             public void processElement(ProcessContext c) throws Exception {
               KV<Void, Iterable<String>> elem = c.element();
               StringBuilder str = new StringBuilder();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f5f329ee/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsITCase.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsITCase.java
 
b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsITCase.java
index 9410481..9e6bba8 100644
--- 
a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsITCase.java
+++ 
b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsITCase.java
@@ -26,7 +26,7 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.windowing.Sessions;
 import org.apache.beam.sdk.transforms.windowing.Window;
@@ -100,8 +100,8 @@ public class TopWikipediaSessionsITCase extends 
StreamingProgramTestBase impleme
 
 
 
-      .apply(ParDo.of(new OldDoFn<TableRow, String>() {
-        @Override
+      .apply(ParDo.of(new DoFn<TableRow, String>() {
+        @ProcessElement
         public void processElement(ProcessContext c) throws Exception {
           TableRow row = c.element();
           long timestamp = (Integer) row.get("timestamp");
@@ -117,8 +117,8 @@ public class TopWikipediaSessionsITCase extends 
StreamingProgramTestBase impleme
 
       .apply(Count.<String>perElement());
 
-    PCollection<String> format = output.apply(ParDo.of(new OldDoFn<KV<String, 
Long>, String>() {
-      @Override
+    PCollection<String> format = output.apply(ParDo.of(new DoFn<KV<String, 
Long>, String>() {
+      @ProcessElement
       public void processElement(ProcessContext c) throws Exception {
         KV<String, Long> el = c.element();
         String out = "user: " + el.getKey() + " value:" + el.getValue();

Reply via email to