Register beam metrics with a MetricSource in Spark

Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/31624fed
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/31624fed
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/31624fed

Branch: refs/heads/master
Commit: 31624fed4e15dd9e5f8aeac6315ca3cfb73f8616
Parents: 8e203ea
Author: Aviem Zur <aviem...@gmail.com>
Authored: Tue Jan 17 15:03:59 2017 +0200
Committer: Sela <ans...@paypal.com>
Committed: Wed Feb 15 11:10:48 2017 +0200

----------------------------------------------------------------------
 .../beam/runners/spark/SparkPipelineResult.java |   6 +-
 .../apache/beam/runners/spark/SparkRunner.java  |  31 +--
 .../spark/aggregators/AccumulatorSingleton.java | 137 ------------
 .../aggregators/AggregatorsAccumulator.java     | 137 ++++++++++++
 .../spark/aggregators/NamedAggregators.java     |   2 +-
 .../spark/aggregators/SparkAggregators.java     |   2 +-
 .../aggregators/metrics/AggregatorMetric.java   |  44 ----
 .../metrics/AggregatorMetricSource.java         |  50 -----
 .../metrics/WithNamedAggregatorsSupport.java    | 174 ---------------
 .../spark/aggregators/metrics/sink/CsvSink.java |  39 ----
 .../aggregators/metrics/sink/GraphiteSink.java  |  39 ----
 .../aggregators/metrics/sink/package-info.java  |  23 --
 .../runners/spark/metrics/AggregatorMetric.java |  43 ++++
 .../spark/metrics/AggregatorMetricSource.java   |  50 +++++
 .../runners/spark/metrics/CompositeSource.java  |  49 +++++
 .../spark/metrics/MetricsAccumulator.java       |  15 +-
 .../spark/metrics/MetricsAccumulatorParam.java  |   2 +-
 .../runners/spark/metrics/SparkBeamMetric.java  |  62 ++++++
 .../spark/metrics/SparkBeamMetricSource.java    |  50 +++++
 .../spark/metrics/SparkMetricResults.java       |  12 +-
 .../spark/metrics/SparkMetricsContainer.java    |  38 ++--
 .../spark/metrics/WithMetricsSupport.java       | 209 +++++++++++++++++++
 .../runners/spark/metrics/sink/CsvSink.java     |  38 ++++
 .../spark/metrics/sink/GraphiteSink.java        |  38 ++++
 .../spark/metrics/sink/package-info.java        |  22 ++
 .../translation/DoFnRunnerWithMetrics.java      |  57 +++--
 .../spark/translation/SparkContextFactory.java  |   2 -
 .../spark/translation/TransformTranslator.java  |   3 +-
 .../streaming/StreamingTransformTranslator.java |   3 +-
 .../spark/aggregators/ClearAggregatorsRule.java |   5 +-
 .../metrics/sink/InMemoryMetrics.java           |  10 +-
 .../spark/src/test/resources/metrics.properties |  10 +-
 .../src/main/resources/beam/findbugs-filter.xml |   4 +-
 33 files changed, 802 insertions(+), 604 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/31624fed/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java
index d0d5569..b0958b0 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java
@@ -25,7 +25,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import org.apache.beam.runners.spark.aggregators.SparkAggregators;
 import org.apache.beam.runners.spark.metrics.SparkMetricResults;
-import org.apache.beam.runners.spark.metrics.SparkMetricsContainer;
 import org.apache.beam.runners.spark.translation.SparkContextFactory;
 import org.apache.beam.sdk.AggregatorRetrievalException;
 import org.apache.beam.sdk.AggregatorValues;
@@ -46,8 +45,8 @@ public abstract class SparkPipelineResult implements 
PipelineResult {
 
   protected final Future pipelineExecution;
   protected JavaSparkContext javaSparkContext;
-
   protected PipelineResult.State state;
+  private final SparkMetricResults metricResults = new SparkMetricResults();
 
   SparkPipelineResult(final Future<?> pipelineExecution,
                       final JavaSparkContext javaSparkContext) {
@@ -124,8 +123,7 @@ public abstract class SparkPipelineResult implements 
PipelineResult {
 
   @Override
   public MetricResults metrics() {
-    return new SparkMetricResults(
-        
SparkMetricsContainer.getAccumulator(SparkContextFactory.EMPTY_CONTEXT));
+    return metricResults;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/31624fed/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
index cc20a30..3dc4857 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
@@ -25,11 +25,12 @@ import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
-import org.apache.beam.runners.spark.aggregators.AccumulatorSingleton;
+import org.apache.beam.runners.spark.aggregators.AggregatorsAccumulator;
 import org.apache.beam.runners.spark.aggregators.NamedAggregators;
 import org.apache.beam.runners.spark.aggregators.SparkAggregators;
-import 
org.apache.beam.runners.spark.aggregators.metrics.AggregatorMetricSource;
-import org.apache.beam.runners.spark.metrics.SparkMetricsContainer;
+import org.apache.beam.runners.spark.metrics.AggregatorMetricSource;
+import org.apache.beam.runners.spark.metrics.CompositeSource;
+import org.apache.beam.runners.spark.metrics.SparkBeamMetricSource;
 import org.apache.beam.runners.spark.translation.EvaluationContext;
 import org.apache.beam.runners.spark.translation.SparkContextFactory;
 import org.apache.beam.runners.spark.translation.SparkPipelineTranslator;
@@ -39,6 +40,7 @@ import 
org.apache.beam.runners.spark.translation.streaming.CheckpointDir;
 import 
org.apache.beam.runners.spark.translation.streaming.SparkRunnerStreamingContextFactory;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.metrics.MetricsEnvironment;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.options.PipelineOptionsValidator;
@@ -139,19 +141,22 @@ public final class SparkRunner extends 
PipelineRunner<SparkPipelineResult> {
     Optional<CheckpointDir> maybeCheckpointDir =
         opts.isStreaming() ? Optional.of(new 
CheckpointDir(opts.getCheckpointDir()))
             : Optional.<CheckpointDir>absent();
-    final Accumulator<NamedAggregators> accum =
+    final Accumulator<NamedAggregators> aggregatorsAccumulator =
         SparkAggregators.getOrCreateNamedAggregators(jsc, maybeCheckpointDir);
-    final NamedAggregators initialValue = accum.value();
-    // Instantiate metrics accumulator
-    SparkMetricsContainer.getAccumulator(jsc);
-
+    final NamedAggregators initialValue = aggregatorsAccumulator.value();
     if (opts.getEnableSparkMetricSinks()) {
       final MetricsSystem metricsSystem = 
SparkEnv$.MODULE$.get().metricsSystem();
+      String appName = opts.getAppName();
       final AggregatorMetricSource aggregatorMetricSource =
-          new AggregatorMetricSource(opts.getAppName(), initialValue);
+          new AggregatorMetricSource(appName, initialValue);
+      final SparkBeamMetricSource metricsSource =
+          new SparkBeamMetricSource(appName);
+      final CompositeSource compositeSource =
+          new CompositeSource(appName,
+              metricsSource.metricRegistry(), 
aggregatorMetricSource.metricRegistry());
       // re-register the metrics in case of context re-use
-      metricsSystem.removeSource(aggregatorMetricSource);
-      metricsSystem.registerSource(aggregatorMetricSource);
+      metricsSystem.removeSource(compositeSource);
+      metricsSystem.registerSource(compositeSource);
     }
   }
 
@@ -163,6 +168,8 @@ public final class SparkRunner extends 
PipelineRunner<SparkPipelineResult> {
     final Future<?> startPipeline;
     final ExecutorService executorService = 
Executors.newSingleThreadExecutor();
 
+    MetricsEnvironment.setMetricsSupported(true);
+
     detectTranslationMode(pipeline);
 
     if (mOptions.isStreaming()) {
@@ -176,7 +183,7 @@ public final class SparkRunner extends 
PipelineRunner<SparkPipelineResult> {
       // Checkpoint aggregator values
       jssc.addStreamingListener(
           new JavaStreamingListenerWrapper(
-              new 
AccumulatorSingleton.AccumulatorCheckpointingSparkListener()));
+              new 
AggregatorsAccumulator.AccumulatorCheckpointingSparkListener()));
 
       // register listeners.
       for (JavaStreamingListener listener: 
mOptions.as(SparkContextOptions.class).getListeners()) {

http://git-wip-us.apache.org/repos/asf/beam/blob/31624fed/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AccumulatorSingleton.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AccumulatorSingleton.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AccumulatorSingleton.java
deleted file mode 100644
index 473750c..0000000
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AccumulatorSingleton.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.spark.aggregators;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import org.apache.beam.runners.spark.translation.streaming.CheckpointDir;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.spark.Accumulator;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.streaming.api.java.JavaStreamingListener;
-import org.apache.spark.streaming.api.java.JavaStreamingListenerBatchCompleted;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * For resilience, {@link Accumulator}s are required to be wrapped in a 
Singleton.
- * @see <a 
href="https://spark.apache.org/docs/1.6.3/streaming-programming-guide.html#accumulators-and-broadcast-variables";>accumulators</a>
- */
-public class AccumulatorSingleton {
-  private static final Logger LOG = 
LoggerFactory.getLogger(AccumulatorSingleton.class);
-
-  private static final String ACCUMULATOR_CHECKPOINT_FILENAME = 
"beam_aggregators";
-
-  private static volatile Accumulator<NamedAggregators> instance;
-  private static volatile FileSystem fileSystem;
-  private static volatile Path checkpointPath;
-  private static volatile Path tempCheckpointPath;
-  private static volatile Path backupCheckpointPath;
-
-  @SuppressWarnings("OptionalUsedAsFieldOrParameterType")
-  static Accumulator<NamedAggregators> getInstance(
-      JavaSparkContext jsc,
-      Optional<CheckpointDir> checkpointDir) {
-    if (instance == null) {
-      synchronized (AccumulatorSingleton.class) {
-        if (instance == null) {
-          instance = jsc.sc().accumulator(new NamedAggregators(), new 
AggAccumParam());
-          if (checkpointDir.isPresent()) {
-            recoverValueFromCheckpoint(jsc, checkpointDir.get());
-          }
-        }
-      }
-    }
-    return instance;
-  }
-
-  private static void recoverValueFromCheckpoint(
-      JavaSparkContext jsc,
-      CheckpointDir checkpointDir) {
-    FSDataInputStream is = null;
-    try {
-      Path beamCheckpointPath = checkpointDir.getBeamCheckpointDir();
-      checkpointPath = new Path(beamCheckpointPath, 
ACCUMULATOR_CHECKPOINT_FILENAME);
-      tempCheckpointPath = checkpointPath.suffix(".tmp");
-      backupCheckpointPath = checkpointPath.suffix(".bak");
-      fileSystem = checkpointPath.getFileSystem(jsc.hadoopConfiguration());
-      if (fileSystem.exists(checkpointPath)) {
-        is = fileSystem.open(checkpointPath);
-      } else if (fileSystem.exists(backupCheckpointPath)) {
-        is = fileSystem.open(backupCheckpointPath);
-      }
-      if (is != null) {
-        ObjectInputStream objectInputStream = new ObjectInputStream(is);
-        NamedAggregators recoveredValue =
-            (NamedAggregators) objectInputStream.readObject();
-        objectInputStream.close();
-        LOG.info("Recovered accumulators from checkpoint: " + recoveredValue);
-        instance.setValue(recoveredValue);
-      } else {
-        LOG.info("No accumulator checkpoint found.");
-      }
-    } catch (Exception e) {
-      throw new RuntimeException("Failure while reading accumulator 
checkpoint.", e);
-    }
-  }
-
-  private static void checkpoint() throws IOException {
-    if (checkpointPath != null) {
-      if (fileSystem.exists(checkpointPath)) {
-        if (fileSystem.exists(backupCheckpointPath)) {
-          fileSystem.delete(backupCheckpointPath, false);
-        }
-        fileSystem.rename(checkpointPath, backupCheckpointPath);
-      }
-      FSDataOutputStream os = fileSystem.create(tempCheckpointPath, true);
-      ObjectOutputStream oos = new ObjectOutputStream(os);
-      oos.writeObject(instance.value());
-      oos.close();
-      fileSystem.rename(tempCheckpointPath, checkpointPath);
-    }
-  }
-
-  @VisibleForTesting
-  static void clear() {
-    synchronized (AccumulatorSingleton.class) {
-      instance = null;
-    }
-  }
-
-  /**
-   * Spark Listener which checkpoints {@link NamedAggregators} values for 
fault-tolerance.
-   */
-  public static class AccumulatorCheckpointingSparkListener extends 
JavaStreamingListener {
-    @Override
-    public void onBatchCompleted(JavaStreamingListenerBatchCompleted 
batchCompleted) {
-      try {
-        checkpoint();
-      } catch (IOException e) {
-        LOG.error("Failed to checkpoint accumulator singleton.", e);
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/31624fed/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AggregatorsAccumulator.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AggregatorsAccumulator.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AggregatorsAccumulator.java
new file mode 100644
index 0000000..187205b
--- /dev/null
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AggregatorsAccumulator.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.spark.aggregators;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import org.apache.beam.runners.spark.translation.streaming.CheckpointDir;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.spark.Accumulator;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.streaming.api.java.JavaStreamingListener;
+import org.apache.spark.streaming.api.java.JavaStreamingListenerBatchCompleted;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * For resilience, {@link Accumulator}s are required to be wrapped in a 
Singleton.
+ * @see <a 
href="https://spark.apache.org/docs/1.6.3/streaming-programming-guide.html#accumulators-and-broadcast-variables";>accumulators</a>
+ */
+public class AggregatorsAccumulator {
+  private static final Logger LOG = 
LoggerFactory.getLogger(AggregatorsAccumulator.class);
+
+  private static final String ACCUMULATOR_CHECKPOINT_FILENAME = 
"beam_aggregators";
+
+  private static volatile Accumulator<NamedAggregators> instance;
+  private static volatile FileSystem fileSystem;
+  private static volatile Path checkpointPath;
+  private static volatile Path tempCheckpointPath;
+  private static volatile Path backupCheckpointPath;
+
+  @SuppressWarnings("OptionalUsedAsFieldOrParameterType")
+  static Accumulator<NamedAggregators> getInstance(
+      JavaSparkContext jsc,
+      Optional<CheckpointDir> checkpointDir) {
+    if (instance == null) {
+      synchronized (AggregatorsAccumulator.class) {
+        if (instance == null) {
+          instance = jsc.sc().accumulator(new NamedAggregators(), new 
AggAccumParam());
+          if (checkpointDir.isPresent()) {
+            recoverValueFromCheckpoint(jsc, checkpointDir.get());
+          }
+        }
+      }
+    }
+    return instance;
+  }
+
+  private static void recoverValueFromCheckpoint(
+      JavaSparkContext jsc,
+      CheckpointDir checkpointDir) {
+    FSDataInputStream is = null;
+    try {
+      Path beamCheckpointPath = checkpointDir.getBeamCheckpointDir();
+      checkpointPath = new Path(beamCheckpointPath, 
ACCUMULATOR_CHECKPOINT_FILENAME);
+      tempCheckpointPath = checkpointPath.suffix(".tmp");
+      backupCheckpointPath = checkpointPath.suffix(".bak");
+      fileSystem = checkpointPath.getFileSystem(jsc.hadoopConfiguration());
+      if (fileSystem.exists(checkpointPath)) {
+        is = fileSystem.open(checkpointPath);
+      } else if (fileSystem.exists(backupCheckpointPath)) {
+        is = fileSystem.open(backupCheckpointPath);
+      }
+      if (is != null) {
+        ObjectInputStream objectInputStream = new ObjectInputStream(is);
+        NamedAggregators recoveredValue =
+            (NamedAggregators) objectInputStream.readObject();
+        objectInputStream.close();
+        LOG.info("Recovered accumulators from checkpoint: " + recoveredValue);
+        instance.setValue(recoveredValue);
+      } else {
+        LOG.info("No accumulator checkpoint found.");
+      }
+    } catch (Exception e) {
+      throw new RuntimeException("Failure while reading accumulator 
checkpoint.", e);
+    }
+  }
+
+  private static void checkpoint() throws IOException {
+    if (checkpointPath != null) {
+      if (fileSystem.exists(checkpointPath)) {
+        if (fileSystem.exists(backupCheckpointPath)) {
+          fileSystem.delete(backupCheckpointPath, false);
+        }
+        fileSystem.rename(checkpointPath, backupCheckpointPath);
+      }
+      FSDataOutputStream os = fileSystem.create(tempCheckpointPath, true);
+      ObjectOutputStream oos = new ObjectOutputStream(os);
+      oos.writeObject(instance.value());
+      oos.close();
+      fileSystem.rename(tempCheckpointPath, checkpointPath);
+    }
+  }
+
+  @VisibleForTesting
+  static void clear() {
+    synchronized (AggregatorsAccumulator.class) {
+      instance = null;
+    }
+  }
+
+  /**
+   * Spark Listener which checkpoints {@link NamedAggregators} values for 
fault-tolerance.
+   */
+  public static class AccumulatorCheckpointingSparkListener extends 
JavaStreamingListener {
+    @Override
+    public void onBatchCompleted(JavaStreamingListenerBatchCompleted 
batchCompleted) {
+      try {
+        checkpoint();
+      } catch (IOException e) {
+        LOG.error("Failed to checkpoint accumulator singleton.", e);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/31624fed/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java
index c876c07..cf6c9ad 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java
@@ -128,7 +128,7 @@ public class NamedAggregators implements Serializable {
   public String toString() {
     StringBuilder sb = new StringBuilder();
     for (Map.Entry<String, State<?, ?, ?>> e : mNamedAggregators.entrySet()) {
-      sb.append(e.getKey()).append(": ").append(e.getValue().render());
+      sb.append(e.getKey()).append(": 
").append(e.getValue().render()).append(" ");
     }
     return sb.toString();
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/31624fed/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/SparkAggregators.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/SparkAggregators.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/SparkAggregators.java
index 245c69e..326acfe 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/SparkAggregators.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/SparkAggregators.java
@@ -85,7 +85,7 @@ public class SparkAggregators {
   public static Accumulator<NamedAggregators> getOrCreateNamedAggregators(
       JavaSparkContext jsc,
       Optional<CheckpointDir> checkpointDir) {
-    return AccumulatorSingleton.getInstance(jsc, checkpointDir);
+    return AggregatorsAccumulator.getInstance(jsc, checkpointDir);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/31624fed/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/AggregatorMetric.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/AggregatorMetric.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/AggregatorMetric.java
deleted file mode 100644
index c07a069..0000000
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/AggregatorMetric.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.spark.aggregators.metrics;
-
-import com.codahale.metrics.Metric;
-
-import org.apache.beam.runners.spark.aggregators.NamedAggregators;
-
-/**
- * An adapter between the {@link NamedAggregators} and codahale's {@link 
Metric}
- * interface.
- */
-public class AggregatorMetric implements Metric {
-
-  private final NamedAggregators namedAggregators;
-
-  private AggregatorMetric(final NamedAggregators namedAggregators) {
-    this.namedAggregators = namedAggregators;
-  }
-
-  public static AggregatorMetric of(final NamedAggregators namedAggregators) {
-    return new AggregatorMetric(namedAggregators);
-  }
-
-  NamedAggregators getNamedAggregators() {
-    return namedAggregators;
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/31624fed/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/AggregatorMetricSource.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/AggregatorMetricSource.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/AggregatorMetricSource.java
deleted file mode 100644
index 2a00aec..0000000
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/AggregatorMetricSource.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.spark.aggregators.metrics;
-
-import com.codahale.metrics.MetricRegistry;
-
-import org.apache.beam.runners.spark.aggregators.NamedAggregators;
-import org.apache.spark.metrics.source.Source;
-
-/**
- * A Spark {@link Source} that is tailored to expose an {@link 
AggregatorMetric},
- * wrapping an underlying {@link NamedAggregators} instance.
- */
-public class AggregatorMetricSource implements Source {
-
-  private final String sourceName;
-
-  private final MetricRegistry metricRegistry = new MetricRegistry();
-
-  public AggregatorMetricSource(final String appName, final NamedAggregators 
aggregators) {
-    sourceName = appName;
-    metricRegistry.register("Beam", AggregatorMetric.of(aggregators));
-  }
-
-  @Override
-  public String sourceName() {
-    return sourceName;
-  }
-
-  @Override
-  public MetricRegistry metricRegistry() {
-    return metricRegistry;
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/31624fed/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/WithNamedAggregatorsSupport.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/WithNamedAggregatorsSupport.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/WithNamedAggregatorsSupport.java
deleted file mode 100644
index 5e71280..0000000
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/WithNamedAggregatorsSupport.java
+++ /dev/null
@@ -1,174 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.spark.aggregators.metrics;
-
-import com.codahale.metrics.Counter;
-import com.codahale.metrics.Gauge;
-import com.codahale.metrics.Histogram;
-import com.codahale.metrics.Meter;
-import com.codahale.metrics.Metric;
-import com.codahale.metrics.MetricFilter;
-import com.codahale.metrics.MetricRegistry;
-import com.codahale.metrics.Timer;
-
-import com.google.common.base.Function;
-import com.google.common.base.Optional;
-import com.google.common.base.Predicate;
-import com.google.common.base.Predicates;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSortedMap;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Ordering;
-
-import java.util.Map;
-import java.util.SortedMap;
-
-import org.apache.beam.runners.spark.aggregators.NamedAggregators;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A {@link MetricRegistry} decorator-like* that supports {@link 
AggregatorMetric} by exposing
- * the underlying * {@link 
org.apache.beam.runners.spark.aggregators.NamedAggregators}'
- * aggregators as {@link Gauge}s.
- * <p>
- * *{@link MetricRegistry} is not an interface, so this is not a by-the-book 
decorator.
- * That said, it delegates all metric related getters to the "decorated" 
instance.
- * </p>
- */
-public class WithNamedAggregatorsSupport extends MetricRegistry {
-
-  private static final Logger LOG = 
LoggerFactory.getLogger(WithNamedAggregatorsSupport.class);
-
-  private MetricRegistry internalMetricRegistry;
-
-  private WithNamedAggregatorsSupport(final MetricRegistry 
internalMetricRegistry) {
-    this.internalMetricRegistry = internalMetricRegistry;
-  }
-
-  public static WithNamedAggregatorsSupport forRegistry(final MetricRegistry 
metricRegistry) {
-    return new WithNamedAggregatorsSupport(metricRegistry);
-  }
-
-  @Override
-  public SortedMap<String, Timer> getTimers(final MetricFilter filter) {
-    return internalMetricRegistry.getTimers(filter);
-  }
-
-  @Override
-  public SortedMap<String, Meter> getMeters(final MetricFilter filter) {
-    return internalMetricRegistry.getMeters(filter);
-  }
-
-  @Override
-  public SortedMap<String, Histogram> getHistograms(final MetricFilter filter) 
{
-    return internalMetricRegistry.getHistograms(filter);
-  }
-
-  @Override
-  public SortedMap<String, Counter> getCounters(final MetricFilter filter) {
-    return internalMetricRegistry.getCounters(filter);
-  }
-
-  @Override
-  public SortedMap<String, Gauge> getGauges(final MetricFilter filter) {
-    return
-        new ImmutableSortedMap.Builder<String, Gauge>(
-            Ordering.from(String.CASE_INSENSITIVE_ORDER))
-            .putAll(internalMetricRegistry.getGauges(filter))
-            .putAll(extractGauges(internalMetricRegistry, filter))
-            .build();
-  }
-
-  private Map<String, Gauge> extractGauges(final MetricRegistry metricRegistry,
-                                           final MetricFilter filter) {
-
-    // find the AggregatorMetric metrics from within all currently registered 
metrics
-    final Optional<Map<String, Gauge>> gauges =
-        FluentIterable
-            .from(metricRegistry.getMetrics().entrySet())
-            .firstMatch(isAggregatorMetric())
-            .transform(toGauges());
-
-    return
-        gauges.isPresent()
-            ? Maps.filterEntries(gauges.get(), matches(filter))
-            : ImmutableMap.<String, Gauge>of();
-  }
-
-  private Function<Map.Entry<String, Metric>, Map<String, Gauge>> toGauges() {
-    return new Function<Map.Entry<String, Metric>, Map<String, Gauge>>() {
-      @Override
-      public Map<String, Gauge> apply(final Map.Entry<String, Metric> entry) {
-        final NamedAggregators agg = ((AggregatorMetric) 
entry.getValue()).getNamedAggregators();
-        final String parentName = entry.getKey();
-        final Map<String, Gauge> gaugeMap = 
Maps.transformEntries(agg.renderAll(), toGauge());
-        final Map<String, Gauge> fullNameGaugeMap = Maps.newLinkedHashMap();
-        for (Map.Entry<String, Gauge> gaugeEntry : gaugeMap.entrySet()) {
-          fullNameGaugeMap.put(parentName + "." + gaugeEntry.getKey(), 
gaugeEntry.getValue());
-        }
-        return Maps.filterValues(fullNameGaugeMap, Predicates.notNull());
-      }
-    };
-  }
-
-  private Maps.EntryTransformer<String, Object, Gauge> toGauge() {
-    return new Maps.EntryTransformer<String, Object, Gauge>() {
-
-      @Override
-      public Gauge transformEntry(final String name, final Object rawValue) {
-        return new Gauge<Double>() {
-
-          @Override
-          public Double getValue() {
-            // at the moment the metric's type is assumed to be
-            // compatible with Double. While far from perfect, it seems 
reasonable at
-            // this point in time
-            try {
-              return Double.parseDouble(rawValue.toString());
-            } catch (final Exception e) {
-              LOG.warn("Failed reporting metric with name [{}], of type [{}], 
since it could not be"
-                  + " converted to double", name, 
rawValue.getClass().getSimpleName(), e);
-              return null;
-            }
-          }
-        };
-      }
-    };
-  }
-
-  private Predicate<Map.Entry<String, Gauge>> matches(final MetricFilter 
filter) {
-    return new Predicate<Map.Entry<String, Gauge>>() {
-      @Override
-      public boolean apply(final Map.Entry<String, Gauge> entry) {
-        return filter.matches(entry.getKey(), entry.getValue());
-      }
-    };
-  }
-
-  private Predicate<Map.Entry<String, Metric>> isAggregatorMetric() {
-    return new Predicate<Map.Entry<String, Metric>>() {
-      @Override
-      public boolean apply(final Map.Entry<String, Metric> metricEntry) {
-        return (metricEntry.getValue() instanceof AggregatorMetric);
-      }
-    };
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/31624fed/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/sink/CsvSink.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/sink/CsvSink.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/sink/CsvSink.java
deleted file mode 100644
index af1601a..0000000
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/sink/CsvSink.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.spark.aggregators.metrics.sink;
-
-import com.codahale.metrics.MetricRegistry;
-
-import java.util.Properties;
-
-import org.apache.beam.runners.spark.aggregators.metrics.AggregatorMetric;
-import 
org.apache.beam.runners.spark.aggregators.metrics.WithNamedAggregatorsSupport;
-import org.apache.spark.metrics.sink.Sink;
-
-/**
- * A Spark {@link Sink} that is tailored to report {@link AggregatorMetric} 
metrics
- * to a CSV file.
- */
-public class CsvSink extends org.apache.spark.metrics.sink.CsvSink {
-  public CsvSink(final Properties properties,
-                 final MetricRegistry metricRegistry,
-                 final org.apache.spark.SecurityManager securityMgr) {
-    super(properties, WithNamedAggregatorsSupport.forRegistry(metricRegistry), 
securityMgr);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/31624fed/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/sink/GraphiteSink.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/sink/GraphiteSink.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/sink/GraphiteSink.java
deleted file mode 100644
index 7a45ef7..0000000
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/sink/GraphiteSink.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.spark.aggregators.metrics.sink;
-
-import com.codahale.metrics.MetricRegistry;
-
-import java.util.Properties;
-
-import org.apache.beam.runners.spark.aggregators.metrics.AggregatorMetric;
-import 
org.apache.beam.runners.spark.aggregators.metrics.WithNamedAggregatorsSupport;
-import org.apache.spark.metrics.sink.Sink;
-
-/**
- * A Spark {@link Sink} that is tailored to report {@link AggregatorMetric} 
metrics
- * to Graphite.
- */
-public class GraphiteSink extends org.apache.spark.metrics.sink.GraphiteSink {
-  public GraphiteSink(final Properties properties,
-                      final MetricRegistry metricRegistry,
-                      final org.apache.spark.SecurityManager securityMgr) {
-    super(properties, WithNamedAggregatorsSupport.forRegistry(metricRegistry), 
securityMgr);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/31624fed/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/sink/package-info.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/sink/package-info.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/sink/package-info.java
deleted file mode 100644
index 2e6dd0d..0000000
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/sink/package-info.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Spark sinks that support
- * the {@link 
org.apache.beam.runners.spark.aggregators.metrics.AggregatorMetric}.
- */
-package org.apache.beam.runners.spark.aggregators.metrics.sink;

http://git-wip-us.apache.org/repos/asf/beam/blob/31624fed/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/AggregatorMetric.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/AggregatorMetric.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/AggregatorMetric.java
new file mode 100644
index 0000000..271cc6b
--- /dev/null
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/AggregatorMetric.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.spark.metrics;
+
+import com.codahale.metrics.Metric;
+
+import org.apache.beam.runners.spark.aggregators.NamedAggregators;
+
+/**
+ * An adapter between the {@link NamedAggregators} and Codahale's {@link 
Metric} interface.
+ */
+public class AggregatorMetric implements Metric {
+
+  private final NamedAggregators namedAggregators;
+
+  private AggregatorMetric(final NamedAggregators namedAggregators) {
+    this.namedAggregators = namedAggregators;
+  }
+
+  public static AggregatorMetric of(final NamedAggregators namedAggregators) {
+    return new AggregatorMetric(namedAggregators);
+  }
+
+  NamedAggregators getNamedAggregators() {
+    return namedAggregators;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/31624fed/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/AggregatorMetricSource.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/AggregatorMetricSource.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/AggregatorMetricSource.java
new file mode 100644
index 0000000..b3880e8
--- /dev/null
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/AggregatorMetricSource.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.spark.metrics;
+
+import com.codahale.metrics.MetricRegistry;
+import org.apache.beam.runners.spark.aggregators.NamedAggregators;
+import org.apache.spark.metrics.source.Source;
+
+
+/**
+ * A Spark {@link Source} that is tailored to expose an {@link 
AggregatorMetric},
+ * wrapping an underlying {@link NamedAggregators} instance.
+ */
+public class AggregatorMetricSource implements Source {
+
+  private final String sourceName;
+
+  private final MetricRegistry metricRegistry = new MetricRegistry();
+
+  public AggregatorMetricSource(final String appName, final NamedAggregators 
aggregators) {
+    sourceName = appName;
+    metricRegistry.register("Beam.Aggregators", 
AggregatorMetric.of(aggregators));
+  }
+
+  @Override
+  public String sourceName() {
+    return sourceName;
+  }
+
+  @Override
+  public MetricRegistry metricRegistry() {
+    return metricRegistry;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/31624fed/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/CompositeSource.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/CompositeSource.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/CompositeSource.java
new file mode 100644
index 0000000..1fb7a17
--- /dev/null
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/CompositeSource.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.spark.metrics;
+
+import com.codahale.metrics.MetricRegistry;
+import org.apache.spark.metrics.source.Source;
+
+
+/**
+ * Composite source made up of several {@link MetricRegistry} instances.
+ */
+public class CompositeSource implements Source {
+  private final String name;
+  private final MetricRegistry metricRegistry;
+
+  public CompositeSource(final String name, MetricRegistry... 
metricRegistries) {
+    this.name = name;
+    this.metricRegistry = new MetricRegistry();
+    for (MetricRegistry metricRegistry : metricRegistries) {
+      this.metricRegistry.registerAll(metricRegistry);
+    }
+  }
+
+  @Override
+  public String sourceName() {
+    return name;
+  }
+
+  @Override
+  public MetricRegistry metricRegistry() {
+    return metricRegistry;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/31624fed/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsAccumulator.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsAccumulator.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsAccumulator.java
index b8f0094..effcbe9 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsAccumulator.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsAccumulator.java
@@ -27,15 +27,12 @@ import org.apache.spark.api.java.JavaSparkContext;
  * For resilience, {@link Accumulator Accumulators} are required to be wrapped 
in a Singleton.
  * @see <a 
href="https://spark.apache.org/docs/1.6.3/streaming-programming-guide.html#accumulators-and-broadcast-variables";>accumulators</a>
  */
-class MetricsAccumulator {
+public class MetricsAccumulator {
 
   private static volatile Accumulator<SparkMetricsContainer> instance = null;
 
-  static Accumulator<SparkMetricsContainer> getInstance(JavaSparkContext jsc) {
+  public static Accumulator<SparkMetricsContainer> 
getOrCreateInstance(JavaSparkContext jsc) {
     if (instance == null) {
-      if (jsc == null) {
-        throw new IllegalStateException("Metrics accumulator has not been 
instantiated");
-      }
       synchronized (MetricsAccumulator.class) {
         if (instance == null) {
           // TODO: currently when recovering from checkpoint, Spark does not 
recover the
@@ -50,6 +47,14 @@ class MetricsAccumulator {
     return instance;
   }
 
+  static Accumulator<SparkMetricsContainer> getInstance() {
+    if (instance == null) {
+      throw new IllegalStateException("Metrics accumulator has not been 
instantiated");
+    } else {
+      return instance;
+    }
+  }
+
   @SuppressWarnings("unused")
   @VisibleForTesting
   static void clear() {

http://git-wip-us.apache.org/repos/asf/beam/blob/31624fed/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsAccumulatorParam.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsAccumulatorParam.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsAccumulatorParam.java
index 032e283..cd54097 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsAccumulatorParam.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsAccumulatorParam.java
@@ -37,6 +37,6 @@ class MetricsAccumulatorParam implements 
AccumulatorParam<SparkMetricsContainer>
 
   @Override
   public SparkMetricsContainer zero(SparkMetricsContainer initialValue) {
-    return initialValue;
+    return new SparkMetricsContainer();
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/31624fed/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkBeamMetric.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkBeamMetric.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkBeamMetric.java
new file mode 100644
index 0000000..0c656d7
--- /dev/null
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkBeamMetric.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.spark.metrics;
+
+import com.codahale.metrics.Metric;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.beam.sdk.metrics.DistributionResult;
+import org.apache.beam.sdk.metrics.MetricName;
+import org.apache.beam.sdk.metrics.MetricQueryResults;
+import org.apache.beam.sdk.metrics.MetricResult;
+import org.apache.beam.sdk.metrics.MetricsFilter;
+
+
+/**
+ * An adapter between the {@link SparkMetricsContainer} and Codahale's {@link 
Metric} interface.
+ */
+class SparkBeamMetric implements Metric {
+  private static final String ILLEGAL_CHARACTERS = "[^A-Za-z0-9\\._-]";
+
+  private final SparkMetricResults metricResults = new SparkMetricResults();
+
+  Map<String, ?> renderAll() {
+    Map<String, Object> metrics = new HashMap<>();
+    MetricQueryResults metricQueryResults =
+        metricResults.queryMetrics(MetricsFilter.builder().build());
+    for (MetricResult<Long> metricResult : metricQueryResults.counters()) {
+      metrics.put(renderName(metricResult), metricResult.committed());
+    }
+    for (MetricResult<DistributionResult> metricResult : 
metricQueryResults.distributions()) {
+      DistributionResult result = metricResult.committed();
+      metrics.put(renderName(metricResult) + ".count", result.count());
+      metrics.put(renderName(metricResult) + ".sum", result.sum());
+      metrics.put(renderName(metricResult) + ".min", result.min());
+      metrics.put(renderName(metricResult) + ".max", result.max());
+      metrics.put(renderName(metricResult) + ".mean", result.mean());
+    }
+    return metrics;
+  }
+
+  private String renderName(MetricResult<?> metricResult) {
+    MetricName metricName = metricResult.name();
+    String rendered = metricResult.step() + "." + metricName.namespace() + "." 
+ metricName.name();
+    return rendered.replaceAll(ILLEGAL_CHARACTERS, "_");
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/31624fed/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkBeamMetricSource.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkBeamMetricSource.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkBeamMetricSource.java
new file mode 100644
index 0000000..24231c3
--- /dev/null
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkBeamMetricSource.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.spark.metrics;
+
+import com.codahale.metrics.MetricRegistry;
+
+import org.apache.spark.metrics.source.Source;
+
+
+/**
+ * A Spark {@link Source} that is tailored to expose a {@link SparkBeamMetric},
+ * wrapping an underlying {@link SparkMetricsContainer} instance.
+ */
+public class SparkBeamMetricSource implements Source {
+
+  private final String sourceName;
+
+  private final MetricRegistry metricRegistry = new MetricRegistry();
+
+  public SparkBeamMetricSource(final String appName) {
+    sourceName = appName;
+    metricRegistry.register("Beam.Metrics", new SparkBeamMetric());
+  }
+
+  @Override
+  public String sourceName() {
+    return sourceName;
+  }
+
+  @Override
+  public MetricRegistry metricRegistry() {
+    return metricRegistry;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/31624fed/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricResults.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricResults.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricResults.java
index aea7b2e..64b92b7 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricResults.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricResults.java
@@ -33,25 +33,19 @@ import org.apache.beam.sdk.metrics.MetricQueryResults;
 import org.apache.beam.sdk.metrics.MetricResult;
 import org.apache.beam.sdk.metrics.MetricResults;
 import org.apache.beam.sdk.metrics.MetricsFilter;
-import org.apache.spark.Accumulator;
 
 
 /**
  * Implementation of {@link MetricResults} for the Spark Runner.
  */
 public class SparkMetricResults extends MetricResults {
-  private final Accumulator<SparkMetricsContainer> metricsAccum;
-
-  public SparkMetricResults(Accumulator<SparkMetricsContainer> metricsAccum) {
-    this.metricsAccum = metricsAccum;
-  }
 
   @Override
   public MetricQueryResults queryMetrics(MetricsFilter filter) {
     return new SparkMetricQueryResults(filter);
   }
 
-  private class SparkMetricQueryResults implements MetricQueryResults {
+  private static class SparkMetricQueryResults implements MetricQueryResults {
     private final MetricsFilter filter;
 
     SparkMetricQueryResults(MetricsFilter filter) {
@@ -62,7 +56,7 @@ public class SparkMetricResults extends MetricResults {
     public Iterable<MetricResult<Long>> counters() {
       return
           FluentIterable
-              .from(metricsAccum.value().getCounters())
+              .from(SparkMetricsContainer.getCounters())
               .filter(matchesFilter(filter))
               .transform(TO_COUNTER_RESULT)
               .toList();
@@ -72,7 +66,7 @@ public class SparkMetricResults extends MetricResults {
     public Iterable<MetricResult<DistributionResult>> distributions() {
       return
           FluentIterable
-              .from(metricsAccum.value().getDistributions())
+              .from(SparkMetricsContainer.getDistributions())
               .filter(matchesFilter(filter))
               .transform(TO_DISTRIBUTION_RESULT)
               .toList();

http://git-wip-us.apache.org/repos/asf/beam/blob/31624fed/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricsContainer.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricsContainer.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricsContainer.java
index 0bf9612..234cb81 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricsContainer.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricsContainer.java
@@ -41,8 +41,6 @@ import org.apache.beam.sdk.metrics.MetricName;
 import org.apache.beam.sdk.metrics.MetricUpdates;
 import org.apache.beam.sdk.metrics.MetricUpdates.MetricUpdate;
 import org.apache.beam.sdk.metrics.MetricsContainer;
-import org.apache.spark.Accumulator;
-import org.apache.spark.api.java.JavaSparkContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -57,12 +55,6 @@ public class SparkMetricsContainer implements Serializable {
 
   private final Map<MetricKey, MetricAggregator<?>> metrics = new HashMap<>();
 
-  SparkMetricsContainer() {}
-
-  public static Accumulator<SparkMetricsContainer> 
getAccumulator(JavaSparkContext jsc) {
-    return MetricsAccumulator.getInstance(jsc);
-  }
-
   public MetricsContainer getContainer(String stepName) {
     if (metricsContainers == null) {
       synchronized (this) {
@@ -80,10 +72,10 @@ public class SparkMetricsContainer implements Serializable {
     }
   }
 
-  Collection<CounterAggregator> getCounters() {
+  static Collection<CounterAggregator> getCounters() {
     return
         FluentIterable
-            .from(metrics.values())
+            .from(getInstance().metrics.values())
             .filter(IS_COUNTER)
             .transform(TO_COUNTER)
             .toList();
@@ -106,10 +98,10 @@ public class SparkMetricsContainer implements Serializable 
{
         }
       };
 
-  Collection<DistributionAggregator> getDistributions() {
+  static Collection<DistributionAggregator> getDistributions() {
     return
         FluentIterable
-            .from(metrics.values())
+            .from(getInstance().metrics.values())
             .filter(IS_DISTRIBUTION)
             .transform(TO_DISTRIBUTION)
             .toList();
@@ -132,10 +124,11 @@ public class SparkMetricsContainer implements 
Serializable {
       };
 
   SparkMetricsContainer merge(SparkMetricsContainer other) {
-    return
-        new SparkMetricsContainer()
-            .updated(this.getAggregators())
-            .updated(other.getAggregators());
+    return this.updated(other.getAggregators());
+  }
+
+  private static SparkMetricsContainer getInstance() {
+    return MetricsAccumulator.getInstance().value();
   }
 
   private Collection<MetricAggregator<?>> getAggregators() {
@@ -143,6 +136,10 @@ public class SparkMetricsContainer implements Serializable 
{
   }
 
   private void writeObject(ObjectOutputStream out) throws IOException {
+    // Since MetricsContainer instances are not serializable, materialize a 
serializable map of
+    // MetricsAggregators relating to the same metrics. This is done here, 
when Spark serializes
+    // the SparkMetricsContainer accumulator before sending results back to 
the driver at a point in
+    // time where all the metrics updates have already been made to the 
MetricsContainers.
     materialize();
     out.defaultWriteObject();
   }
@@ -285,4 +282,13 @@ public class SparkMetricsContainer implements Serializable 
{
       return h;
     }
   }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    for (Map.Entry<String, ?> metric :  new 
SparkBeamMetric().renderAll().entrySet()) {
+      sb.append(metric.getKey()).append(": 
").append(metric.getValue()).append(" ");
+    }
+    return sb.toString();
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/31624fed/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/WithMetricsSupport.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/WithMetricsSupport.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/WithMetricsSupport.java
new file mode 100644
index 0000000..ff5fc34
--- /dev/null
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/WithMetricsSupport.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.spark.metrics;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.Metric;
+import com.codahale.metrics.MetricFilter;
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Timer;
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableSortedMap;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Ordering;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.SortedMap;
+import org.apache.beam.runners.spark.aggregators.NamedAggregators;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * A {@link MetricRegistry} decorator-like that supports {@link 
AggregatorMetric} and
+ * {@link SparkBeamMetric} as {@link Gauge Gauges}.
+ * <p>
+ * {@link MetricRegistry} is not an interface, so this is not a by-the-book 
decorator.
+ * That said, it delegates all metric related getters to the "decorated" 
instance.
+ * </p>
+ */
+public class WithMetricsSupport extends MetricRegistry {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(WithMetricsSupport.class);
+
+  private final MetricRegistry internalMetricRegistry;
+
+  private WithMetricsSupport(final MetricRegistry internalMetricRegistry) {
+    this.internalMetricRegistry = internalMetricRegistry;
+  }
+
+  public static WithMetricsSupport forRegistry(final MetricRegistry 
metricRegistry) {
+    return new WithMetricsSupport(metricRegistry);
+  }
+
+  @Override
+  public SortedMap<String, Timer> getTimers(final MetricFilter filter) {
+    return internalMetricRegistry.getTimers(filter);
+  }
+
+  @Override
+  public SortedMap<String, Meter> getMeters(final MetricFilter filter) {
+    return internalMetricRegistry.getMeters(filter);
+  }
+
+  @Override
+  public SortedMap<String, Histogram> getHistograms(final MetricFilter filter) 
{
+    return internalMetricRegistry.getHistograms(filter);
+  }
+
+  @Override
+  public SortedMap<String, Counter> getCounters(final MetricFilter filter) {
+    return internalMetricRegistry.getCounters(filter);
+  }
+
+  @Override
+  public SortedMap<String, Gauge> getGauges(final MetricFilter filter) {
+    return
+        new ImmutableSortedMap.Builder<String, Gauge>(
+            Ordering.from(String.CASE_INSENSITIVE_ORDER))
+            .putAll(internalMetricRegistry.getGauges(filter))
+            .putAll(extractGauges(internalMetricRegistry, filter))
+            .build();
+  }
+
+  private Map<String, Gauge> extractGauges(final MetricRegistry metricRegistry,
+                                           final MetricFilter filter) {
+    Map<String, Gauge> gauges = new HashMap<>();
+
+    // find the AggregatorMetric metrics from within all currently registered 
metrics
+    final Optional<Map<String, Gauge>> aggregatorMetrics =
+        FluentIterable
+            .from(metricRegistry.getMetrics().entrySet())
+            .firstMatch(isAggregatorMetric())
+            .transform(aggregatorMetricToGauges());
+
+    // find the SparkBeamMetric metrics from within all currently registered 
metrics
+    final Optional<Map<String, Gauge>> beamMetrics =
+        FluentIterable
+            .from(metricRegistry.getMetrics().entrySet())
+            .firstMatch(isSparkBeamMetric())
+            .transform(beamMetricToGauges());
+
+    if (aggregatorMetrics.isPresent()) {
+      gauges.putAll(Maps.filterEntries(aggregatorMetrics.get(), 
matches(filter)));
+    }
+
+    if (beamMetrics.isPresent()) {
+      gauges.putAll(Maps.filterEntries(beamMetrics.get(), matches(filter)));
+    }
+
+    return gauges;
+  }
+
+  private Function<Map.Entry<String, Metric>, Map<String, Gauge>> 
aggregatorMetricToGauges() {
+    return new Function<Map.Entry<String, Metric>, Map<String, Gauge>>() {
+      @Override
+      public Map<String, Gauge> apply(final Map.Entry<String, Metric> entry) {
+        final NamedAggregators agg = ((AggregatorMetric) 
entry.getValue()).getNamedAggregators();
+        final String parentName = entry.getKey();
+        final Map<String, Gauge> gaugeMap = 
Maps.transformEntries(agg.renderAll(), toGauge());
+        final Map<String, Gauge> fullNameGaugeMap = Maps.newLinkedHashMap();
+        for (Map.Entry<String, Gauge> gaugeEntry : gaugeMap.entrySet()) {
+          fullNameGaugeMap.put(parentName + "." + gaugeEntry.getKey(), 
gaugeEntry.getValue());
+        }
+        return Maps.filterValues(fullNameGaugeMap, Predicates.notNull());
+      }
+    };
+  }
+
+  private Function<Map.Entry<String, Metric>, Map<String, Gauge>> 
beamMetricToGauges() {
+    return new Function<Map.Entry<String, Metric>, Map<String, Gauge>>() {
+      @Override
+      public Map<String, Gauge> apply(final Map.Entry<String, Metric> entry) {
+        final Map<String, ?> metrics = ((SparkBeamMetric) 
entry.getValue()).renderAll();
+        final String parentName = entry.getKey();
+        final Map<String, Gauge> gaugeMap = Maps.transformEntries(metrics, 
toGauge());
+        final Map<String, Gauge> fullNameGaugeMap = Maps.newLinkedHashMap();
+        for (Map.Entry<String, Gauge> gaugeEntry : gaugeMap.entrySet()) {
+          fullNameGaugeMap.put(parentName + "." + gaugeEntry.getKey(), 
gaugeEntry.getValue());
+        }
+        return Maps.filterValues(fullNameGaugeMap, Predicates.notNull());
+      }
+    };
+  }
+
+  private Maps.EntryTransformer<String, Object, Gauge> toGauge() {
+    return new Maps.EntryTransformer<String, Object, Gauge>() {
+
+      @Override
+      public Gauge transformEntry(final String name, final Object rawValue) {
+        return new Gauge<Double>() {
+
+          @Override
+          public Double getValue() {
+            // at the moment the metric's type is assumed to be
+            // compatible with Double. While far from perfect, it seems 
reasonable at
+            // this point in time
+            try {
+              return Double.parseDouble(rawValue.toString());
+            } catch (final Exception e) {
+              LOG.warn("Failed reporting metric with name [{}], of type [{}], 
since it could not be"
+                  + " converted to double", name, 
rawValue.getClass().getSimpleName(), e);
+              return null;
+            }
+          }
+        };
+      }
+    };
+  }
+
+  private Predicate<Map.Entry<String, Gauge>> matches(final MetricFilter 
filter) {
+    return new Predicate<Map.Entry<String, Gauge>>() {
+      @Override
+      public boolean apply(final Map.Entry<String, Gauge> entry) {
+        return filter.matches(entry.getKey(), entry.getValue());
+      }
+    };
+  }
+
+  private Predicate<Map.Entry<String, Metric>> isAggregatorMetric() {
+    return new Predicate<Map.Entry<String, Metric>>() {
+      @Override
+      public boolean apply(final Map.Entry<String, Metric> metricEntry) {
+        return (metricEntry.getValue() instanceof AggregatorMetric);
+      }
+    };
+  }
+
+  private Predicate<Map.Entry<String, Metric>> isSparkBeamMetric() {
+    return new Predicate<Map.Entry<String, Metric>>() {
+      @Override
+      public boolean apply(final Map.Entry<String, Metric> metricEntry) {
+        return (metricEntry.getValue() instanceof SparkBeamMetric);
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/31624fed/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/sink/CsvSink.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/sink/CsvSink.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/sink/CsvSink.java
new file mode 100644
index 0000000..131aa43
--- /dev/null
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/sink/CsvSink.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.spark.metrics.sink;
+
+import com.codahale.metrics.MetricRegistry;
+import java.util.Properties;
+import org.apache.beam.runners.spark.metrics.AggregatorMetric;
+import org.apache.beam.runners.spark.metrics.WithMetricsSupport;
+import org.apache.spark.metrics.sink.Sink;
+
+
+/**
+ * A Spark {@link Sink} that is tailored to report {@link AggregatorMetric} 
metrics
+ * to a CSV file.
+ */
+public class CsvSink extends org.apache.spark.metrics.sink.CsvSink {
+  public CsvSink(final Properties properties,
+                 final MetricRegistry metricRegistry,
+                 final org.apache.spark.SecurityManager securityMgr) {
+    super(properties, WithMetricsSupport.forRegistry(metricRegistry), 
securityMgr);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/31624fed/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/sink/GraphiteSink.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/sink/GraphiteSink.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/sink/GraphiteSink.java
new file mode 100644
index 0000000..d496306
--- /dev/null
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/sink/GraphiteSink.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.spark.metrics.sink;
+
+import com.codahale.metrics.MetricRegistry;
+import java.util.Properties;
+import org.apache.beam.runners.spark.metrics.AggregatorMetric;
+import org.apache.beam.runners.spark.metrics.WithMetricsSupport;
+import org.apache.spark.metrics.sink.Sink;
+
+
+/**
+ * A Spark {@link Sink} that is tailored to report {@link AggregatorMetric} 
metrics
+ * to Graphite.
+ */
+public class GraphiteSink extends org.apache.spark.metrics.sink.GraphiteSink {
+  public GraphiteSink(final Properties properties,
+                      final MetricRegistry metricRegistry,
+                      final org.apache.spark.SecurityManager securityMgr) {
+    super(properties, WithMetricsSupport.forRegistry(metricRegistry), 
securityMgr);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/31624fed/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/sink/package-info.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/sink/package-info.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/sink/package-info.java
new file mode 100644
index 0000000..ce73d9a
--- /dev/null
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/sink/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Spark sinks that supports beam metrics and aggregators.
+ */
+package org.apache.beam.runners.spark.metrics.sink;

http://git-wip-us.apache.org/repos/asf/beam/blob/31624fed/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnRunnerWithMetrics.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnRunnerWithMetrics.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnRunnerWithMetrics.java
index d9366ca..fa9a9c2 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnRunnerWithMetrics.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnRunnerWithMetrics.java
@@ -39,8 +39,10 @@ class DoFnRunnerWithMetrics<InputT, OutputT> implements 
DoFnRunner<InputT, Outpu
   private final String stepName;
   private final Accumulator<SparkMetricsContainer> metricsAccum;
 
-  DoFnRunnerWithMetrics(String stepName, DoFnRunner<InputT, OutputT> delegate,
-                        Accumulator<SparkMetricsContainer>metricsAccum) {
+  DoFnRunnerWithMetrics(
+      String stepName,
+      DoFnRunner<InputT, OutputT> delegate,
+      Accumulator<SparkMetricsContainer> metricsAccum) {
     this.delegate = delegate;
     this.stepName = stepName;
     this.metricsAccum = metricsAccum;
@@ -48,51 +50,42 @@ class DoFnRunnerWithMetrics<InputT, OutputT> implements 
DoFnRunner<InputT, Outpu
 
   @Override
   public void startBundle() {
-    doWithMetricsContainer(new Runnable() {
-      @Override
-      public void run() {
-        delegate.startBundle();
-      }
-    });
+    try (Closeable ignored = 
MetricsEnvironment.scopedMetricsContainer(metricsContainer())) {
+      delegate.startBundle();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
   }
 
   @Override
   public void processElement(final WindowedValue<InputT> elem) {
-    doWithMetricsContainer(new Runnable() {
-      @Override
-      public void run() {
-        delegate.processElement(elem);
-      }
-    });
+    try (Closeable ignored = 
MetricsEnvironment.scopedMetricsContainer(metricsContainer())) {
+      delegate.processElement(elem);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
   }
 
   @Override
   public void onTimer(final String timerId, final BoundedWindow window, final 
Instant timestamp,
                       final TimeDomain timeDomain) {
-    doWithMetricsContainer(new Runnable() {
-      @Override
-      public void run() {
-        delegate.onTimer(timerId, window, timestamp, timeDomain);
-      }
-    });
+    try (Closeable ignored = 
MetricsEnvironment.scopedMetricsContainer(metricsContainer())) {
+      delegate.onTimer(timerId, window, timestamp, timeDomain);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
   }
 
   @Override
   public void finishBundle() {
-    doWithMetricsContainer(new Runnable() {
-      @Override
-      public void run() {
-        delegate.finishBundle();
-      }
-    });
-  }
-
-  private void doWithMetricsContainer(Runnable runnable) {
-    MetricsContainer metricsContainer = 
metricsAccum.localValue().getContainer(stepName);
-    try (Closeable ignored = 
MetricsEnvironment.scopedMetricsContainer(metricsContainer)) {
-      runnable.run();
+    try (Closeable ignored = 
MetricsEnvironment.scopedMetricsContainer(metricsContainer())) {
+      delegate.finishBundle();
     } catch (IOException e) {
       throw new RuntimeException(e);
     }
   }
+
+  private MetricsContainer metricsContainer() {
+    return metricsAccum.localValue().getContainer(stepName);
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/31624fed/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java
index bd26ba1..326838a 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java
@@ -33,8 +33,6 @@ import org.slf4j.LoggerFactory;
 public final class SparkContextFactory {
   private static final Logger LOG = 
LoggerFactory.getLogger(SparkContextFactory.class);
 
-  public static final JavaSparkContext EMPTY_CONTEXT = null;
-
   /**
    * If the property {@code beam.spark.test.reuseSparkContext} is set to
    * {@code true} then the Spark context will be reused for beam pipelines.

http://git-wip-us.apache.org/repos/asf/beam/blob/31624fed/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
index 3d75142..7f4b708 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
@@ -42,6 +42,7 @@ import org.apache.beam.runners.spark.io.hadoop.HadoopIO;
 import org.apache.beam.runners.spark.io.hadoop.ShardNameTemplateHelper;
 import org.apache.beam.runners.spark.io.hadoop.TemplatedAvroKeyOutputFormat;
 import org.apache.beam.runners.spark.io.hadoop.TemplatedTextOutputFormat;
+import org.apache.beam.runners.spark.metrics.MetricsAccumulator;
 import org.apache.beam.runners.spark.metrics.SparkMetricsContainer;
 import org.apache.beam.runners.spark.util.SideInputBroadcast;
 import org.apache.beam.sdk.coders.Coder;
@@ -253,7 +254,7 @@ public final class TransformTranslator {
         Accumulator<NamedAggregators> aggAccum =
             SparkAggregators.getNamedAggregators(jsc);
         Accumulator<SparkMetricsContainer> metricsAccum =
-            SparkMetricsContainer.getAccumulator(jsc);
+            MetricsAccumulator.getOrCreateInstance(jsc);
         Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> 
sideInputs =
             TranslationUtils.getSideInputs(transform.getSideInputs(), context);
         context.putDataset(transform,

http://git-wip-us.apache.org/repos/asf/beam/blob/31624fed/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
index c9ab2b3..2bfd172 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
@@ -30,6 +30,7 @@ import 
org.apache.beam.runners.spark.aggregators.SparkAggregators;
 import org.apache.beam.runners.spark.io.ConsoleIO;
 import org.apache.beam.runners.spark.io.CreateStream;
 import org.apache.beam.runners.spark.io.SparkUnboundedSource;
+import org.apache.beam.runners.spark.metrics.MetricsAccumulator;
 import org.apache.beam.runners.spark.metrics.SparkMetricsContainer;
 import org.apache.beam.runners.spark.translation.BoundedDataset;
 import org.apache.beam.runners.spark.translation.Dataset;
@@ -396,7 +397,7 @@ final class StreamingTransformTranslator {
             final Accumulator<NamedAggregators> aggAccum =
                 SparkAggregators.getNamedAggregators(jsc);
             final Accumulator<SparkMetricsContainer> metricsAccum =
-                SparkMetricsContainer.getAccumulator(jsc);
+                MetricsAccumulator.getOrCreateInstance(jsc);
             final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, 
SideInputBroadcast<?>>> sideInputs =
                 TranslationUtils.getSideInputs(transform.getSideInputs(),
                     jsc, pviews);

http://git-wip-us.apache.org/repos/asf/beam/blob/31624fed/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/ClearAggregatorsRule.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/ClearAggregatorsRule.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/ClearAggregatorsRule.java
index 4e91d15..0b31acc 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/ClearAggregatorsRule.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/ClearAggregatorsRule.java
@@ -20,8 +20,9 @@ package org.apache.beam.runners.spark.aggregators;
 
 import org.junit.rules.ExternalResource;
 
+
 /**
- * A rule that clears the {@link 
org.apache.beam.runners.spark.aggregators.AccumulatorSingleton}
+ * A rule that clears the {@link AggregatorsAccumulator}
  * which represents the Beam {@link 
org.apache.beam.sdk.transforms.Aggregator}s.
  */
 public class ClearAggregatorsRule extends ExternalResource {
@@ -32,6 +33,6 @@ public class ClearAggregatorsRule extends ExternalResource {
   }
 
   public void clearNamedAggregators() {
-    AccumulatorSingleton.clear();
+    AggregatorsAccumulator.clear();
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/31624fed/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/InMemoryMetrics.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/InMemoryMetrics.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/InMemoryMetrics.java
index 389cd03..f6e16ae 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/InMemoryMetrics.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/InMemoryMetrics.java
@@ -23,7 +23,7 @@ import com.codahale.metrics.MetricRegistry;
 import com.google.common.base.Predicates;
 import com.google.common.collect.Iterables;
 import java.util.Properties;
-import 
org.apache.beam.runners.spark.aggregators.metrics.WithNamedAggregatorsSupport;
+import org.apache.beam.runners.spark.metrics.WithMetricsSupport;
 import org.apache.spark.metrics.sink.Sink;
 
 
@@ -32,17 +32,18 @@ import org.apache.spark.metrics.sink.Sink;
  */
 public class InMemoryMetrics implements Sink {
 
-  private static WithNamedAggregatorsSupport extendedMetricsRegistry;
+  private static WithMetricsSupport extendedMetricsRegistry;
   private static MetricRegistry internalMetricRegistry;
 
+  @SuppressWarnings("UnusedParameters")
   public InMemoryMetrics(final Properties properties,
                          final MetricRegistry metricRegistry,
                          final org.apache.spark.SecurityManager securityMgr) {
-    extendedMetricsRegistry = 
WithNamedAggregatorsSupport.forRegistry(metricRegistry);
+    extendedMetricsRegistry = WithMetricsSupport.forRegistry(metricRegistry);
     internalMetricRegistry = metricRegistry;
   }
 
-  @SuppressWarnings("unchecked")
+  @SuppressWarnings({"unchecked", "WeakerAccess"})
   public static <T> T valueOf(final String name) {
     final T retVal;
 
@@ -62,6 +63,7 @@ public class InMemoryMetrics implements Sink {
     return retVal;
   }
 
+  @SuppressWarnings("WeakerAccess")
   public static void clearAll() {
     if (internalMetricRegistry != null) {
       internalMetricRegistry.removeMatching(MetricFilter.ALL);

Reply via email to