Repository: beam Updated Branches: refs/heads/master e5afbb27f -> c442ef81a
[BEAM-1304] Checking for nullity before trying to obtain an aggregator's value. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/882c654b Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/882c654b Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/882c654b Branch: refs/heads/master Commit: 882c654b1a8aefd2e4281d786448734731db7816 Parents: e5afbb2 Author: Stas Levin <stasle...@gmail.com> Authored: Sun Feb 5 12:17:35 2017 +0200 Committer: Stas Levin <stasle...@gmail.com> Committed: Sun Feb 5 15:51:18 2017 +0200 ---------------------------------------------------------------------- .../runners/spark/aggregators/NamedAggregators.java | 6 ++++-- .../aggregators/metrics/sink/NamedAggregatorsTest.java | 13 +++++++++++++ 2 files changed, 17 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/882c654b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java index b5aec32..c876c07 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java @@ -68,10 +68,12 @@ public class NamedAggregators implements Serializable { * @param name Name of aggregator to retrieve. * @param typeClass Type class to cast the value to. * @param <T> Type to be returned. - * @return the value of the aggregator associated with the specified name + * @return the value of the aggregator associated with the specified name, + * or <code>null</code> if the specified aggregator could not be found. */ public <T> T getValue(String name, Class<T> typeClass) { - return typeClass.cast(mNamedAggregators.get(name).render()); + final State<?, ?, ?> state = mNamedAggregators.get(name); + return state != null ? typeClass.cast(state.render()) : null; } /** http://git-wip-us.apache.org/repos/asf/beam/blob/882c654b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java index 3b5dd21..8646510 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java @@ -28,10 +28,13 @@ import java.util.List; import java.util.Set; import org.apache.beam.runners.spark.SparkPipelineOptions; import org.apache.beam.runners.spark.aggregators.ClearAggregatorsRule; +import org.apache.beam.runners.spark.aggregators.SparkAggregators; import org.apache.beam.runners.spark.examples.WordCount; +import org.apache.beam.runners.spark.translation.SparkContextFactory; import org.apache.beam.runners.spark.translation.streaming.utils.SparkTestPipelineOptions; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.MapElements; @@ -95,4 +98,14 @@ public class NamedAggregatorsTest { assertThat(InMemoryMetrics.<Double>valueOf("emptyLines"), is(1d)); } + + @Test + public void testNonExistingAggregatorName() throws Exception { + final SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class); + final Long valueOf = + SparkAggregators.valueOf( + "myMissingAggregator", Long.class, SparkContextFactory.getSparkContext(options)); + + assertThat(valueOf, is(nullValue())); + } }