[BEAM-816] Aggregators are not properly named when reported to Graphite.

Added NamedAggregatorTest.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/6db94249
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/6db94249
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/6db94249

Branch: refs/heads/python-sdk
Commit: 6db942498a6aa20e9fd253871320d0ee4aa9476d
Parents: dc61a00
Author: Stas Levin <stasle...@gmail.com>
Authored: Tue Oct 25 18:23:23 2016 +0300
Committer: Sela <ans...@paypal.com>
Committed: Wed Oct 26 21:16:01 2016 +0300

----------------------------------------------------------------------
 .../metrics/AggregatorMetricSource.java         |  9 +-
 .../metrics/WithNamedAggregatorsSupport.java    |  7 +-
 .../spark/translation/SparkRuntimeContext.java  |  2 +-
 .../runners/spark/ClearAggregatorsRule.java     | 33 -------
 .../runners/spark/InMemoryMetricsSinkRule.java  | 32 -------
 .../metrics/sink/ClearAggregatorsRule.java      | 33 +++++++
 .../metrics/sink/InMemoryMetrics.java           | 15 +++-
 .../metrics/sink/InMemoryMetricsSinkRule.java   | 31 +++++++
 .../metrics/sink/NamedAggregatorsTest.java      | 92 ++++++++++++++++++++
 9 files changed, 179 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6db94249/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/AggregatorMetricSource.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/AggregatorMetricSource.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/AggregatorMetricSource.java
index 0658e04..2a00aec 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/AggregatorMetricSource.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/AggregatorMetricSource.java
@@ -29,17 +29,18 @@ import org.apache.spark.metrics.source.Source;
  */
 public class AggregatorMetricSource implements Source {
 
-  private static final String SOURCE_NAME = "NamedAggregators";
+  private final String sourceName;
 
   private final MetricRegistry metricRegistry = new MetricRegistry();
 
-  public AggregatorMetricSource(final NamedAggregators aggregators) {
-    metricRegistry.register(SOURCE_NAME, AggregatorMetric.of(aggregators));
+  public AggregatorMetricSource(final String appName, final NamedAggregators 
aggregators) {
+    sourceName = appName;
+    metricRegistry.register("Beam", AggregatorMetric.of(aggregators));
   }
 
   @Override
   public String sourceName() {
-    return SOURCE_NAME;
+    return sourceName;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6db94249/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/WithNamedAggregatorsSupport.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/WithNamedAggregatorsSupport.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/WithNamedAggregatorsSupport.java
index 88e2211..6932ae6 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/WithNamedAggregatorsSupport.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/WithNamedAggregatorsSupport.java
@@ -118,8 +118,13 @@ public class WithNamedAggregatorsSupport extends 
MetricRegistry {
       @Override
       public Map<String, Gauge> apply(final Map.Entry<String, Metric> entry) {
         final NamedAggregators agg = ((AggregatorMetric) 
entry.getValue()).getNamedAggregators();
+        final String parentName = entry.getKey();
         final Map<String, Gauge> gaugeMap = 
Maps.transformEntries(agg.renderAll(), toGauge());
-        return Maps.filterValues(gaugeMap, Predicates.notNull());
+        final Map<String, Gauge> fullNameGaugeMap = Maps.newLinkedHashMap();
+        for (String shortName : gaugeMap.keySet()) {
+          fullNameGaugeMap.put(parentName + "." + shortName, 
gaugeMap.get(shortName));
+        }
+        return Maps.filterValues(fullNameGaugeMap, Predicates.notNull());
       }
     };
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6db94249/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
index 94c1648..181a111 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
@@ -89,7 +89,7 @@ public class SparkRuntimeContext implements Serializable {
     if (opts.getEnableSparkSinks()) {
       final MetricsSystem metricsSystem = 
SparkEnv$.MODULE$.get().metricsSystem();
       final AggregatorMetricSource aggregatorMetricSource =
-          new AggregatorMetricSource(initialValue);
+          new AggregatorMetricSource(opts.getAppName(), initialValue);
       // in case the context was not cleared
       metricsSystem.removeSource(aggregatorMetricSource);
       metricsSystem.registerSource(aggregatorMetricSource);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6db94249/runners/spark/src/test/java/org/apache/beam/runners/spark/ClearAggregatorsRule.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/ClearAggregatorsRule.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/ClearAggregatorsRule.java
deleted file mode 100644
index beaae13..0000000
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/ClearAggregatorsRule.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.spark;
-
-import org.apache.beam.runners.spark.aggregators.AccumulatorSingleton;
-import org.junit.rules.ExternalResource;
-
-/**
- * A rule that clears the {@link 
org.apache.beam.runners.spark.aggregators.AccumulatorSingleton}
- * which represents the Beam {@link 
org.apache.beam.sdk.transforms.Aggregator}s.
- */
-class ClearAggregatorsRule extends ExternalResource {
-  @Override
-  protected void before() throws Throwable {
-    AccumulatorSingleton.clear();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6db94249/runners/spark/src/test/java/org/apache/beam/runners/spark/InMemoryMetricsSinkRule.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/InMemoryMetricsSinkRule.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/InMemoryMetricsSinkRule.java
deleted file mode 100644
index 506dbbd..0000000
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/InMemoryMetricsSinkRule.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.spark;
-
-import org.apache.beam.runners.spark.aggregators.metrics.sink.InMemoryMetrics;
-import org.junit.rules.ExternalResource;
-
-/**
- * A rule that cleans the {@link InMemoryMetrics} after the tests has finished.
- */
-class InMemoryMetricsSinkRule extends ExternalResource {
-  @Override
-  protected void before() throws Throwable {
-    InMemoryMetrics.clearAll();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6db94249/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/ClearAggregatorsRule.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/ClearAggregatorsRule.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/ClearAggregatorsRule.java
new file mode 100644
index 0000000..79c58a7
--- /dev/null
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/ClearAggregatorsRule.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.spark.aggregators.metrics.sink;
+
+import org.apache.beam.runners.spark.aggregators.AccumulatorSingleton;
+import org.junit.rules.ExternalResource;
+
+/**
+ * A rule that clears the {@link 
org.apache.beam.runners.spark.aggregators.AccumulatorSingleton}
+ * which represents the Beam {@link 
org.apache.beam.sdk.transforms.Aggregator}s.
+ */
+class ClearAggregatorsRule extends ExternalResource {
+  @Override
+  protected void before() throws Throwable {
+    AccumulatorSingleton.clear();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6db94249/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/InMemoryMetrics.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/InMemoryMetrics.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/InMemoryMetrics.java
index 35e6717..389cd03 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/InMemoryMetrics.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/InMemoryMetrics.java
@@ -20,12 +20,13 @@ package 
org.apache.beam.runners.spark.aggregators.metrics.sink;
 
 import com.codahale.metrics.MetricFilter;
 import com.codahale.metrics.MetricRegistry;
-
+import com.google.common.base.Predicates;
+import com.google.common.collect.Iterables;
 import java.util.Properties;
-
 import 
org.apache.beam.runners.spark.aggregators.metrics.WithNamedAggregatorsSupport;
 import org.apache.spark.metrics.sink.Sink;
 
+
 /**
  * An in-memory {@link Sink} implementation for tests.
  */
@@ -45,9 +46,15 @@ public class InMemoryMetrics implements Sink {
   public static <T> T valueOf(final String name) {
     final T retVal;
 
+    // this might fail in case we have multiple aggregators with the same 
suffix after
+    // the last dot, but it should be good enough for tests.
     if (extendedMetricsRegistry != null
-        && extendedMetricsRegistry.getGauges().containsKey(name)) {
-      retVal = (T) extendedMetricsRegistry.getGauges().get(name).getValue();
+        && Iterables.any(extendedMetricsRegistry.getGauges().keySet(),
+        Predicates.containsPattern(name + "$"))) {
+      String key =
+          Iterables.find(extendedMetricsRegistry.getGauges().keySet(),
+              Predicates.containsPattern(name + "$"));
+      retVal = (T) extendedMetricsRegistry.getGauges().get(key).getValue();
     } else {
       retVal = null;
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6db94249/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/InMemoryMetricsSinkRule.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/InMemoryMetricsSinkRule.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/InMemoryMetricsSinkRule.java
new file mode 100644
index 0000000..5a3d19d
--- /dev/null
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/InMemoryMetricsSinkRule.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.spark.aggregators.metrics.sink;
+
+import org.junit.rules.ExternalResource;
+
+/**
+ * A rule that cleans the {@link InMemoryMetrics} after the tests has finished.
+ */
+class InMemoryMetricsSinkRule extends ExternalResource {
+  @Override
+  protected void before() throws Throwable {
+    InMemoryMetrics.clearAll();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6db94249/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java
new file mode 100644
index 0000000..194d66a
--- /dev/null
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.spark.aggregators.metrics.sink;
+
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.nullValue;
+import static org.junit.Assert.assertThat;
+
+import com.google.common.collect.ImmutableSet;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import org.apache.beam.runners.spark.SparkPipelineOptions;
+import org.apache.beam.runners.spark.SparkRunner;
+import org.apache.beam.runners.spark.examples.WordCount;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.values.PCollection;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExternalResource;
+
+
+/**
+ * A test for the NamedAggregators mechanism.
+ */
+public class NamedAggregatorsTest {
+
+  @Rule
+  public ExternalResource inMemoryMetricsSink = new InMemoryMetricsSinkRule();
+
+  @Rule
+  public ClearAggregatorsRule clearAggregators = new ClearAggregatorsRule();
+
+  private Pipeline createSparkPipeline() {
+    final SparkPipelineOptions options = 
PipelineOptionsFactory.as(SparkPipelineOptions.class);
+    options.setRunner(SparkRunner.class);
+    return Pipeline.create(options);
+  }
+
+  private void runPipeline() {
+
+    final List<String> words =
+        Arrays.asList("hi there", "hi", "hi sue bob", "hi sue", "", "bob hi");
+
+    final Set<String> expectedCounts =
+        ImmutableSet.of("hi: 5", "there: 1", "sue: 2", "bob: 2");
+
+    final Pipeline pipeline = createSparkPipeline();
+
+    final PCollection<String> output =
+        pipeline
+        .apply(Create.of(words).withCoder(StringUtf8Coder.of()))
+        .apply(new WordCount.CountWords())
+        .apply(MapElements.via(new WordCount.FormatAsTextFn()));
+
+    PAssert.that(output).containsInAnyOrder(expectedCounts);
+
+    pipeline.run();
+  }
+
+  @Test
+  public void testNamedAggregators() throws Exception {
+
+    assertThat(InMemoryMetrics.valueOf("emptyLines"), is(nullValue()));
+
+    runPipeline();
+
+    assertThat(InMemoryMetrics.<Double>valueOf("emptyLines"), is(1d));
+
+  }
+}

Reply via email to