Removing Aggregators from Examples
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/904b4130 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/904b4130 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/904b4130 Branch: refs/heads/master Commit: 904b4130b8ba4e9edc0c776da99cbe46d00d9442 Parents: 1d97bdf Author: Pablo <pabl...@google.com> Authored: Tue Mar 7 12:50:38 2017 -0800 Committer: bchambers <bchamb...@google.com> Committed: Tue Apr 25 12:45:33 2017 -0700 ---------------------------------------------------------------------- .../beam/examples/DebuggingWordCount.java | 20 +++++++++----------- .../org/apache/beam/examples/WordCount.java | 9 ++++----- .../cookbook/CombinePerKeyExamples.java | 9 ++++----- .../beam/examples/complete/game/GameStats.java | 8 ++++---- .../beam/examples/complete/game/UserScore.java | 8 ++++---- 5 files changed, 25 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/904b4130/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java index 031f317..4c82f46 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java +++ b/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java @@ -22,14 +22,14 @@ import java.util.List; import java.util.regex.Pattern; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.slf4j.Logger; @@ -51,7 +51,7 @@ import org.slf4j.LoggerFactory; * <p>New Concepts: * <pre> * 1. Logging using SLF4J, even in a distributed environment - * 2. Creating a custom aggregator (runners have varying levels of support) + * 2. Creating a custom metric (runners have varying levels of support) * 3. Testing your Pipeline via PAssert * </pre> * @@ -90,14 +90,12 @@ public class DebuggingWordCount { } /** - * Concept #2: A custom aggregator can track values in your pipeline as it runs. Each - * runner provides varying levels of support for aggregators, and may expose them + * Concept #2: A custom metric can track values in your pipeline as it runs. Each + * runner provides varying levels of support for metrics, and may expose them * in a dashboard, etc. */ - private final Aggregator<Long, Long> matchedWords = - createAggregator("matchedWords", Sum.ofLongs()); - private final Aggregator<Long, Long> unmatchedWords = - createAggregator("unmatchedWords", Sum.ofLongs()); + private final Counter matchedWords = Metrics.counter(FilterTextFn.class, "matchedWords"); + private final Counter unmatchedWords = Metrics.counter(FilterTextFn.class, "unMatchedWords"); @ProcessElement public void processElement(ProcessContext c) { @@ -105,14 +103,14 @@ public class DebuggingWordCount { // Log at the "DEBUG" level each element that we match. When executing this pipeline // these log lines will appear only if the log level is set to "DEBUG" or lower. LOG.debug("Matched: " + c.element().getKey()); - matchedWords.addValue(1L); + matchedWords.inc(); c.output(c.element()); } else { // Log at the "TRACE" level each element that is not matched. Different log levels // can be used to control the verbosity of logging providing an effective mechanism // to filter less important information. LOG.trace("Did not match: " + c.element().getKey()); - unmatchedWords.addValue(1L); + unmatchedWords.inc(); } } } http://git-wip-us.apache.org/repos/asf/beam/blob/904b4130/examples/java/src/main/java/org/apache/beam/examples/WordCount.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/WordCount.java b/examples/java/src/main/java/org/apache/beam/examples/WordCount.java index 7e21d47..0c786bc 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/WordCount.java +++ b/examples/java/src/main/java/org/apache/beam/examples/WordCount.java @@ -19,19 +19,19 @@ package org.apache.beam.examples; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.Validation.Required; -import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SimpleFunction; -import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; @@ -86,13 +86,12 @@ public class WordCount { * to a ParDo in the pipeline. */ static class ExtractWordsFn extends DoFn<String, String> { - private final Aggregator<Long, Long> emptyLines = - createAggregator("emptyLines", Sum.ofLongs()); + private final Counter emptyLines = Metrics.counter(ExtractWordsFn.class, "emptyLines"); @ProcessElement public void processElement(ProcessContext c) { if (c.element().trim().isEmpty()) { - emptyLines.addValue(1L); + emptyLines.inc(); } // Split the line into words. http://git-wip-us.apache.org/repos/asf/beam/blob/904b4130/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java index 8d13b90..39553a5 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java +++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java @@ -24,18 +24,18 @@ import java.util.ArrayList; import java.util.List; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.Validation; -import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; -import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; @@ -79,8 +79,7 @@ public class CombinePerKeyExamples { * outputs word, play_name. */ static class ExtractLargeWordsFn extends DoFn<TableRow, KV<String, String>> { - private final Aggregator<Long, Long> smallerWords = - createAggregator("smallerWords", Sum.ofLongs()); + private final Counter smallerWords = Metrics.counter(ExtractLargeWordsFn.class, "smallerWords"); @ProcessElement public void processElement(ProcessContext c){ @@ -92,7 +91,7 @@ public class CombinePerKeyExamples { } else { // Track how many smaller words we're not including. This information will be // visible in the Monitoring UI. - smallerWords.addValue(1L); + smallerWords.inc(); } } } http://git-wip-us.apache.org/repos/asf/beam/blob/904b4130/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java ---------------------------------------------------------------------- diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java index 9c79fad..b6c05be 100644 --- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java +++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java @@ -26,10 +26,11 @@ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.MapElements; @@ -126,8 +127,7 @@ public class GameStats extends LeaderBoard { .apply("ProcessAndFilter", ParDo // use the derived mean total score as a side input .of(new DoFn<KV<String, Integer>, KV<String, Integer>>() { - private final Aggregator<Long, Long> numSpammerUsers = - createAggregator("SpammerUsers", Sum.ofLongs()); + private final Counter numSpammerUsers = Metrics.counter("main", "SpammerUsers"); @ProcessElement public void processElement(ProcessContext c) { Integer score = c.element().getValue(); @@ -135,7 +135,7 @@ public class GameStats extends LeaderBoard { if (score > (gmc * SCORE_WEIGHT)) { LOG.info("user " + c.element().getKey() + " spammer score " + score + " with mean " + gmc); - numSpammerUsers.addValue(1L); + numSpammerUsers.inc(); c.output(c.element()); } } http://git-wip-us.apache.org/repos/asf/beam/blob/904b4130/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java ---------------------------------------------------------------------- diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java index b4b023f..0adaabc 100644 --- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java +++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java @@ -25,12 +25,13 @@ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.coders.DefaultCoder; import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.Validation; -import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PTransform; @@ -125,8 +126,7 @@ public class UserScore { // Log and count parse errors. private static final Logger LOG = LoggerFactory.getLogger(ParseEventFn.class); - private final Aggregator<Long, Long> numParseErrors = - createAggregator("ParseErrors", Sum.ofLongs()); + private final Counter numParseErrors = Metrics.counter("main", "ParseErrors"); @ProcessElement public void processElement(ProcessContext c) { @@ -139,7 +139,7 @@ public class UserScore { GameActionInfo gInfo = new GameActionInfo(user, team, score, timestamp); c.output(gInfo); } catch (ArrayIndexOutOfBoundsException | NumberFormatException e) { - numParseErrors.addValue(1L); + numParseErrors.inc(); LOG.info("Parse error on " + c.element() + ", " + e.getMessage()); } }