Port WindowedWordCount example from OldDoFn to DoFn
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ca9e3372 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ca9e3372 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ca9e3372 Branch: refs/heads/python-sdk Commit: ca9e337203208c7c5876f0710fb3a45430a5b3a8 Parents: 4ceec0e Author: Kenneth Knowles <k...@google.com> Authored: Fri Jul 22 14:29:01 2016 -0700 Committer: Dan Halperin <dhalp...@google.com> Committed: Wed Aug 3 18:25:53 2016 -0700 ---------------------------------------------------------------------- .../org/apache/beam/examples/WindowedWordCount.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ca9e3372/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java index 17f7da3..842cb54 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java +++ b/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java @@ -27,7 +27,7 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.Window; @@ -103,14 +103,14 @@ public class WindowedWordCount { static final int WINDOW_SIZE = 1; // Default window duration in minutes /** - * Concept #2: A OldDoFn that sets the data element timestamp. This is a silly method, just for + * Concept #2: A DoFn that sets the data element timestamp. This is a silly method, just for * this example, for the bounded data case. * * <p>Imagine that many ghosts of Shakespeare are all typing madly at the same time to recreate * his masterworks. Each line of the corpus will get a random associated timestamp somewhere in a * 2-hour period. */ - static class AddTimestampFn extends OldDoFn<String, String> { + static class AddTimestampFn extends DoFn<String, String> { private static final Duration RAND_RANGE = Duration.standardHours(2); private final Instant minTimestamp; @@ -118,7 +118,7 @@ public class WindowedWordCount { this.minTimestamp = new Instant(System.currentTimeMillis()); } - @Override + @ProcessElement public void processElement(ProcessContext c) { // Generate a timestamp that falls somewhere in the past two hours. long randMillis = (long) (Math.random() * RAND_RANGE.getMillis()); @@ -130,9 +130,9 @@ public class WindowedWordCount { } } - /** A OldDoFn that converts a Word and Count into a BigQuery table row. */ - static class FormatAsTableRowFn extends OldDoFn<KV<String, Long>, TableRow> { - @Override + /** A DoFn that converts a Word and Count into a BigQuery table row. */ + static class FormatAsTableRowFn extends DoFn<KV<String, Long>, TableRow> { + @ProcessElement public void processElement(ProcessContext c) { TableRow row = new TableRow() .set("word", c.element().getKey())