[BEAM-816] Aggregators are not properly named when reported to Graphite. Added NamedAggregatorTest.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/6db94249 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/6db94249 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/6db94249 Branch: refs/heads/python-sdk Commit: 6db942498a6aa20e9fd253871320d0ee4aa9476d Parents: dc61a00 Author: Stas Levin <stasle...@gmail.com> Authored: Tue Oct 25 18:23:23 2016 +0300 Committer: Sela <ans...@paypal.com> Committed: Wed Oct 26 21:16:01 2016 +0300 ---------------------------------------------------------------------- .../metrics/AggregatorMetricSource.java | 9 +- .../metrics/WithNamedAggregatorsSupport.java | 7 +- .../spark/translation/SparkRuntimeContext.java | 2 +- .../runners/spark/ClearAggregatorsRule.java | 33 ------- .../runners/spark/InMemoryMetricsSinkRule.java | 32 ------- .../metrics/sink/ClearAggregatorsRule.java | 33 +++++++ .../metrics/sink/InMemoryMetrics.java | 15 +++- .../metrics/sink/InMemoryMetricsSinkRule.java | 31 +++++++ .../metrics/sink/NamedAggregatorsTest.java | 92 ++++++++++++++++++++ 9 files changed, 179 insertions(+), 75 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6db94249/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/AggregatorMetricSource.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/AggregatorMetricSource.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/AggregatorMetricSource.java index 0658e04..2a00aec 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/AggregatorMetricSource.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/AggregatorMetricSource.java @@ -29,17 +29,18 @@ import org.apache.spark.metrics.source.Source; */ public class AggregatorMetricSource implements Source { - private static final String SOURCE_NAME = "NamedAggregators"; + private final String sourceName; private final MetricRegistry metricRegistry = new MetricRegistry(); - public AggregatorMetricSource(final NamedAggregators aggregators) { - metricRegistry.register(SOURCE_NAME, AggregatorMetric.of(aggregators)); + public AggregatorMetricSource(final String appName, final NamedAggregators aggregators) { + sourceName = appName; + metricRegistry.register("Beam", AggregatorMetric.of(aggregators)); } @Override public String sourceName() { - return SOURCE_NAME; + return sourceName; } @Override http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6db94249/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/WithNamedAggregatorsSupport.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/WithNamedAggregatorsSupport.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/WithNamedAggregatorsSupport.java index 88e2211..6932ae6 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/WithNamedAggregatorsSupport.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/WithNamedAggregatorsSupport.java @@ -118,8 +118,13 @@ public class WithNamedAggregatorsSupport extends MetricRegistry { @Override public Map<String, Gauge> apply(final Map.Entry<String, Metric> entry) { final NamedAggregators agg = ((AggregatorMetric) entry.getValue()).getNamedAggregators(); + final String parentName = entry.getKey(); final Map<String, Gauge> gaugeMap = Maps.transformEntries(agg.renderAll(), toGauge()); - return Maps.filterValues(gaugeMap, Predicates.notNull()); + final Map<String, Gauge> fullNameGaugeMap = Maps.newLinkedHashMap(); + for (String shortName : gaugeMap.keySet()) { + fullNameGaugeMap.put(parentName + "." + shortName, gaugeMap.get(shortName)); + } + return Maps.filterValues(fullNameGaugeMap, Predicates.notNull()); } }; } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6db94249/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java index 94c1648..181a111 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java @@ -89,7 +89,7 @@ public class SparkRuntimeContext implements Serializable { if (opts.getEnableSparkSinks()) { final MetricsSystem metricsSystem = SparkEnv$.MODULE$.get().metricsSystem(); final AggregatorMetricSource aggregatorMetricSource = - new AggregatorMetricSource(initialValue); + new AggregatorMetricSource(opts.getAppName(), initialValue); // in case the context was not cleared metricsSystem.removeSource(aggregatorMetricSource); metricsSystem.registerSource(aggregatorMetricSource); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6db94249/runners/spark/src/test/java/org/apache/beam/runners/spark/ClearAggregatorsRule.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/ClearAggregatorsRule.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/ClearAggregatorsRule.java deleted file mode 100644 index beaae13..0000000 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/ClearAggregatorsRule.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.spark; - -import org.apache.beam.runners.spark.aggregators.AccumulatorSingleton; -import org.junit.rules.ExternalResource; - -/** - * A rule that clears the {@link org.apache.beam.runners.spark.aggregators.AccumulatorSingleton} - * which represents the Beam {@link org.apache.beam.sdk.transforms.Aggregator}s. - */ -class ClearAggregatorsRule extends ExternalResource { - @Override - protected void before() throws Throwable { - AccumulatorSingleton.clear(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6db94249/runners/spark/src/test/java/org/apache/beam/runners/spark/InMemoryMetricsSinkRule.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/InMemoryMetricsSinkRule.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/InMemoryMetricsSinkRule.java deleted file mode 100644 index 506dbbd..0000000 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/InMemoryMetricsSinkRule.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.spark; - -import org.apache.beam.runners.spark.aggregators.metrics.sink.InMemoryMetrics; -import org.junit.rules.ExternalResource; - -/** - * A rule that cleans the {@link InMemoryMetrics} after the tests has finished. - */ -class InMemoryMetricsSinkRule extends ExternalResource { - @Override - protected void before() throws Throwable { - InMemoryMetrics.clearAll(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6db94249/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/ClearAggregatorsRule.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/ClearAggregatorsRule.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/ClearAggregatorsRule.java new file mode 100644 index 0000000..79c58a7 --- /dev/null +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/ClearAggregatorsRule.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.spark.aggregators.metrics.sink; + +import org.apache.beam.runners.spark.aggregators.AccumulatorSingleton; +import org.junit.rules.ExternalResource; + +/** + * A rule that clears the {@link org.apache.beam.runners.spark.aggregators.AccumulatorSingleton} + * which represents the Beam {@link org.apache.beam.sdk.transforms.Aggregator}s. + */ +class ClearAggregatorsRule extends ExternalResource { + @Override + protected void before() throws Throwable { + AccumulatorSingleton.clear(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6db94249/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/InMemoryMetrics.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/InMemoryMetrics.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/InMemoryMetrics.java index 35e6717..389cd03 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/InMemoryMetrics.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/InMemoryMetrics.java @@ -20,12 +20,13 @@ package org.apache.beam.runners.spark.aggregators.metrics.sink; import com.codahale.metrics.MetricFilter; import com.codahale.metrics.MetricRegistry; - +import com.google.common.base.Predicates; +import com.google.common.collect.Iterables; import java.util.Properties; - import org.apache.beam.runners.spark.aggregators.metrics.WithNamedAggregatorsSupport; import org.apache.spark.metrics.sink.Sink; + /** * An in-memory {@link Sink} implementation for tests. */ @@ -45,9 +46,15 @@ public class InMemoryMetrics implements Sink { public static <T> T valueOf(final String name) { final T retVal; + // this might fail in case we have multiple aggregators with the same suffix after + // the last dot, but it should be good enough for tests. if (extendedMetricsRegistry != null - && extendedMetricsRegistry.getGauges().containsKey(name)) { - retVal = (T) extendedMetricsRegistry.getGauges().get(name).getValue(); + && Iterables.any(extendedMetricsRegistry.getGauges().keySet(), + Predicates.containsPattern(name + "$"))) { + String key = + Iterables.find(extendedMetricsRegistry.getGauges().keySet(), + Predicates.containsPattern(name + "$")); + retVal = (T) extendedMetricsRegistry.getGauges().get(key).getValue(); } else { retVal = null; } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6db94249/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/InMemoryMetricsSinkRule.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/InMemoryMetricsSinkRule.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/InMemoryMetricsSinkRule.java new file mode 100644 index 0000000..5a3d19d --- /dev/null +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/InMemoryMetricsSinkRule.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.spark.aggregators.metrics.sink; + +import org.junit.rules.ExternalResource; + +/** + * A rule that cleans the {@link InMemoryMetrics} after the tests has finished. + */ +class InMemoryMetricsSinkRule extends ExternalResource { + @Override + protected void before() throws Throwable { + InMemoryMetrics.clearAll(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6db94249/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java new file mode 100644 index 0000000..194d66a --- /dev/null +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.spark.aggregators.metrics.sink; + +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; +import static org.junit.Assert.assertThat; + +import com.google.common.collect.ImmutableSet; +import java.util.Arrays; +import java.util.List; +import java.util.Set; +import org.apache.beam.runners.spark.SparkPipelineOptions; +import org.apache.beam.runners.spark.SparkRunner; +import org.apache.beam.runners.spark.examples.WordCount; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.values.PCollection; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExternalResource; + + +/** + * A test for the NamedAggregators mechanism. + */ +public class NamedAggregatorsTest { + + @Rule + public ExternalResource inMemoryMetricsSink = new InMemoryMetricsSinkRule(); + + @Rule + public ClearAggregatorsRule clearAggregators = new ClearAggregatorsRule(); + + private Pipeline createSparkPipeline() { + final SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class); + options.setRunner(SparkRunner.class); + return Pipeline.create(options); + } + + private void runPipeline() { + + final List<String> words = + Arrays.asList("hi there", "hi", "hi sue bob", "hi sue", "", "bob hi"); + + final Set<String> expectedCounts = + ImmutableSet.of("hi: 5", "there: 1", "sue: 2", "bob: 2"); + + final Pipeline pipeline = createSparkPipeline(); + + final PCollection<String> output = + pipeline + .apply(Create.of(words).withCoder(StringUtf8Coder.of())) + .apply(new WordCount.CountWords()) + .apply(MapElements.via(new WordCount.FormatAsTextFn())); + + PAssert.that(output).containsInAnyOrder(expectedCounts); + + pipeline.run(); + } + + @Test + public void testNamedAggregators() throws Exception { + + assertThat(InMemoryMetrics.valueOf("emptyLines"), is(nullValue())); + + runPipeline(); + + assertThat(InMemoryMetrics.<Double>valueOf("emptyLines"), is(1d)); + + } +}