Register beam metrics with a MetricSource in Spark
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/31624fed Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/31624fed Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/31624fed Branch: refs/heads/master Commit: 31624fed4e15dd9e5f8aeac6315ca3cfb73f8616 Parents: 8e203ea Author: Aviem Zur <aviem...@gmail.com> Authored: Tue Jan 17 15:03:59 2017 +0200 Committer: Sela <ans...@paypal.com> Committed: Wed Feb 15 11:10:48 2017 +0200 ---------------------------------------------------------------------- .../beam/runners/spark/SparkPipelineResult.java | 6 +- .../apache/beam/runners/spark/SparkRunner.java | 31 +-- .../spark/aggregators/AccumulatorSingleton.java | 137 ------------ .../aggregators/AggregatorsAccumulator.java | 137 ++++++++++++ .../spark/aggregators/NamedAggregators.java | 2 +- .../spark/aggregators/SparkAggregators.java | 2 +- .../aggregators/metrics/AggregatorMetric.java | 44 ---- .../metrics/AggregatorMetricSource.java | 50 ----- .../metrics/WithNamedAggregatorsSupport.java | 174 --------------- .../spark/aggregators/metrics/sink/CsvSink.java | 39 ---- .../aggregators/metrics/sink/GraphiteSink.java | 39 ---- .../aggregators/metrics/sink/package-info.java | 23 -- .../runners/spark/metrics/AggregatorMetric.java | 43 ++++ .../spark/metrics/AggregatorMetricSource.java | 50 +++++ .../runners/spark/metrics/CompositeSource.java | 49 +++++ .../spark/metrics/MetricsAccumulator.java | 15 +- .../spark/metrics/MetricsAccumulatorParam.java | 2 +- .../runners/spark/metrics/SparkBeamMetric.java | 62 ++++++ .../spark/metrics/SparkBeamMetricSource.java | 50 +++++ .../spark/metrics/SparkMetricResults.java | 12 +- .../spark/metrics/SparkMetricsContainer.java | 38 ++-- .../spark/metrics/WithMetricsSupport.java | 209 +++++++++++++++++++ .../runners/spark/metrics/sink/CsvSink.java | 38 ++++ .../spark/metrics/sink/GraphiteSink.java | 38 ++++ .../spark/metrics/sink/package-info.java | 22 ++ .../translation/DoFnRunnerWithMetrics.java | 57 +++-- .../spark/translation/SparkContextFactory.java | 2 - .../spark/translation/TransformTranslator.java | 3 +- .../streaming/StreamingTransformTranslator.java | 3 +- .../spark/aggregators/ClearAggregatorsRule.java | 5 +- .../metrics/sink/InMemoryMetrics.java | 10 +- .../spark/src/test/resources/metrics.properties | 10 +- .../src/main/resources/beam/findbugs-filter.xml | 4 +- 33 files changed, 802 insertions(+), 604 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/31624fed/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java index d0d5569..b0958b0 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java @@ -25,7 +25,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import org.apache.beam.runners.spark.aggregators.SparkAggregators; import org.apache.beam.runners.spark.metrics.SparkMetricResults; -import org.apache.beam.runners.spark.metrics.SparkMetricsContainer; import org.apache.beam.runners.spark.translation.SparkContextFactory; import org.apache.beam.sdk.AggregatorRetrievalException; import org.apache.beam.sdk.AggregatorValues; @@ -46,8 +45,8 @@ public abstract class SparkPipelineResult implements PipelineResult { protected final Future pipelineExecution; protected JavaSparkContext javaSparkContext; - protected PipelineResult.State state; + private final SparkMetricResults metricResults = new SparkMetricResults(); SparkPipelineResult(final Future<?> pipelineExecution, final JavaSparkContext javaSparkContext) { @@ -124,8 +123,7 @@ public abstract class SparkPipelineResult implements PipelineResult { @Override public MetricResults metrics() { - return new SparkMetricResults( - SparkMetricsContainer.getAccumulator(SparkContextFactory.EMPTY_CONTEXT)); + return metricResults; } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/31624fed/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java index cc20a30..3dc4857 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java @@ -25,11 +25,12 @@ import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; -import org.apache.beam.runners.spark.aggregators.AccumulatorSingleton; +import org.apache.beam.runners.spark.aggregators.AggregatorsAccumulator; import org.apache.beam.runners.spark.aggregators.NamedAggregators; import org.apache.beam.runners.spark.aggregators.SparkAggregators; -import org.apache.beam.runners.spark.aggregators.metrics.AggregatorMetricSource; -import org.apache.beam.runners.spark.metrics.SparkMetricsContainer; +import org.apache.beam.runners.spark.metrics.AggregatorMetricSource; +import org.apache.beam.runners.spark.metrics.CompositeSource; +import org.apache.beam.runners.spark.metrics.SparkBeamMetricSource; import org.apache.beam.runners.spark.translation.EvaluationContext; import org.apache.beam.runners.spark.translation.SparkContextFactory; import org.apache.beam.runners.spark.translation.SparkPipelineTranslator; @@ -39,6 +40,7 @@ import org.apache.beam.runners.spark.translation.streaming.CheckpointDir; import org.apache.beam.runners.spark.translation.streaming.SparkRunnerStreamingContextFactory; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.metrics.MetricsEnvironment; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.PipelineOptionsValidator; @@ -139,19 +141,22 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> { Optional<CheckpointDir> maybeCheckpointDir = opts.isStreaming() ? Optional.of(new CheckpointDir(opts.getCheckpointDir())) : Optional.<CheckpointDir>absent(); - final Accumulator<NamedAggregators> accum = + final Accumulator<NamedAggregators> aggregatorsAccumulator = SparkAggregators.getOrCreateNamedAggregators(jsc, maybeCheckpointDir); - final NamedAggregators initialValue = accum.value(); - // Instantiate metrics accumulator - SparkMetricsContainer.getAccumulator(jsc); - + final NamedAggregators initialValue = aggregatorsAccumulator.value(); if (opts.getEnableSparkMetricSinks()) { final MetricsSystem metricsSystem = SparkEnv$.MODULE$.get().metricsSystem(); + String appName = opts.getAppName(); final AggregatorMetricSource aggregatorMetricSource = - new AggregatorMetricSource(opts.getAppName(), initialValue); + new AggregatorMetricSource(appName, initialValue); + final SparkBeamMetricSource metricsSource = + new SparkBeamMetricSource(appName); + final CompositeSource compositeSource = + new CompositeSource(appName, + metricsSource.metricRegistry(), aggregatorMetricSource.metricRegistry()); // re-register the metrics in case of context re-use - metricsSystem.removeSource(aggregatorMetricSource); - metricsSystem.registerSource(aggregatorMetricSource); + metricsSystem.removeSource(compositeSource); + metricsSystem.registerSource(compositeSource); } } @@ -163,6 +168,8 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> { final Future<?> startPipeline; final ExecutorService executorService = Executors.newSingleThreadExecutor(); + MetricsEnvironment.setMetricsSupported(true); + detectTranslationMode(pipeline); if (mOptions.isStreaming()) { @@ -176,7 +183,7 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> { // Checkpoint aggregator values jssc.addStreamingListener( new JavaStreamingListenerWrapper( - new AccumulatorSingleton.AccumulatorCheckpointingSparkListener())); + new AggregatorsAccumulator.AccumulatorCheckpointingSparkListener())); // register listeners. for (JavaStreamingListener listener: mOptions.as(SparkContextOptions.class).getListeners()) { http://git-wip-us.apache.org/repos/asf/beam/blob/31624fed/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AccumulatorSingleton.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AccumulatorSingleton.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AccumulatorSingleton.java deleted file mode 100644 index 473750c..0000000 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AccumulatorSingleton.java +++ /dev/null @@ -1,137 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.spark.aggregators; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Optional; -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import org.apache.beam.runners.spark.translation.streaming.CheckpointDir; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.spark.Accumulator; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.streaming.api.java.JavaStreamingListener; -import org.apache.spark.streaming.api.java.JavaStreamingListenerBatchCompleted; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -/** - * For resilience, {@link Accumulator}s are required to be wrapped in a Singleton. - * @see <a href="https://spark.apache.org/docs/1.6.3/streaming-programming-guide.html#accumulators-and-broadcast-variables">accumulators</a> - */ -public class AccumulatorSingleton { - private static final Logger LOG = LoggerFactory.getLogger(AccumulatorSingleton.class); - - private static final String ACCUMULATOR_CHECKPOINT_FILENAME = "beam_aggregators"; - - private static volatile Accumulator<NamedAggregators> instance; - private static volatile FileSystem fileSystem; - private static volatile Path checkpointPath; - private static volatile Path tempCheckpointPath; - private static volatile Path backupCheckpointPath; - - @SuppressWarnings("OptionalUsedAsFieldOrParameterType") - static Accumulator<NamedAggregators> getInstance( - JavaSparkContext jsc, - Optional<CheckpointDir> checkpointDir) { - if (instance == null) { - synchronized (AccumulatorSingleton.class) { - if (instance == null) { - instance = jsc.sc().accumulator(new NamedAggregators(), new AggAccumParam()); - if (checkpointDir.isPresent()) { - recoverValueFromCheckpoint(jsc, checkpointDir.get()); - } - } - } - } - return instance; - } - - private static void recoverValueFromCheckpoint( - JavaSparkContext jsc, - CheckpointDir checkpointDir) { - FSDataInputStream is = null; - try { - Path beamCheckpointPath = checkpointDir.getBeamCheckpointDir(); - checkpointPath = new Path(beamCheckpointPath, ACCUMULATOR_CHECKPOINT_FILENAME); - tempCheckpointPath = checkpointPath.suffix(".tmp"); - backupCheckpointPath = checkpointPath.suffix(".bak"); - fileSystem = checkpointPath.getFileSystem(jsc.hadoopConfiguration()); - if (fileSystem.exists(checkpointPath)) { - is = fileSystem.open(checkpointPath); - } else if (fileSystem.exists(backupCheckpointPath)) { - is = fileSystem.open(backupCheckpointPath); - } - if (is != null) { - ObjectInputStream objectInputStream = new ObjectInputStream(is); - NamedAggregators recoveredValue = - (NamedAggregators) objectInputStream.readObject(); - objectInputStream.close(); - LOG.info("Recovered accumulators from checkpoint: " + recoveredValue); - instance.setValue(recoveredValue); - } else { - LOG.info("No accumulator checkpoint found."); - } - } catch (Exception e) { - throw new RuntimeException("Failure while reading accumulator checkpoint.", e); - } - } - - private static void checkpoint() throws IOException { - if (checkpointPath != null) { - if (fileSystem.exists(checkpointPath)) { - if (fileSystem.exists(backupCheckpointPath)) { - fileSystem.delete(backupCheckpointPath, false); - } - fileSystem.rename(checkpointPath, backupCheckpointPath); - } - FSDataOutputStream os = fileSystem.create(tempCheckpointPath, true); - ObjectOutputStream oos = new ObjectOutputStream(os); - oos.writeObject(instance.value()); - oos.close(); - fileSystem.rename(tempCheckpointPath, checkpointPath); - } - } - - @VisibleForTesting - static void clear() { - synchronized (AccumulatorSingleton.class) { - instance = null; - } - } - - /** - * Spark Listener which checkpoints {@link NamedAggregators} values for fault-tolerance. - */ - public static class AccumulatorCheckpointingSparkListener extends JavaStreamingListener { - @Override - public void onBatchCompleted(JavaStreamingListenerBatchCompleted batchCompleted) { - try { - checkpoint(); - } catch (IOException e) { - LOG.error("Failed to checkpoint accumulator singleton.", e); - } - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/31624fed/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AggregatorsAccumulator.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AggregatorsAccumulator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AggregatorsAccumulator.java new file mode 100644 index 0000000..187205b --- /dev/null +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AggregatorsAccumulator.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.spark.aggregators; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import org.apache.beam.runners.spark.translation.streaming.CheckpointDir; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.spark.Accumulator; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.streaming.api.java.JavaStreamingListener; +import org.apache.spark.streaming.api.java.JavaStreamingListenerBatchCompleted; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * For resilience, {@link Accumulator}s are required to be wrapped in a Singleton. + * @see <a href="https://spark.apache.org/docs/1.6.3/streaming-programming-guide.html#accumulators-and-broadcast-variables">accumulators</a> + */ +public class AggregatorsAccumulator { + private static final Logger LOG = LoggerFactory.getLogger(AggregatorsAccumulator.class); + + private static final String ACCUMULATOR_CHECKPOINT_FILENAME = "beam_aggregators"; + + private static volatile Accumulator<NamedAggregators> instance; + private static volatile FileSystem fileSystem; + private static volatile Path checkpointPath; + private static volatile Path tempCheckpointPath; + private static volatile Path backupCheckpointPath; + + @SuppressWarnings("OptionalUsedAsFieldOrParameterType") + static Accumulator<NamedAggregators> getInstance( + JavaSparkContext jsc, + Optional<CheckpointDir> checkpointDir) { + if (instance == null) { + synchronized (AggregatorsAccumulator.class) { + if (instance == null) { + instance = jsc.sc().accumulator(new NamedAggregators(), new AggAccumParam()); + if (checkpointDir.isPresent()) { + recoverValueFromCheckpoint(jsc, checkpointDir.get()); + } + } + } + } + return instance; + } + + private static void recoverValueFromCheckpoint( + JavaSparkContext jsc, + CheckpointDir checkpointDir) { + FSDataInputStream is = null; + try { + Path beamCheckpointPath = checkpointDir.getBeamCheckpointDir(); + checkpointPath = new Path(beamCheckpointPath, ACCUMULATOR_CHECKPOINT_FILENAME); + tempCheckpointPath = checkpointPath.suffix(".tmp"); + backupCheckpointPath = checkpointPath.suffix(".bak"); + fileSystem = checkpointPath.getFileSystem(jsc.hadoopConfiguration()); + if (fileSystem.exists(checkpointPath)) { + is = fileSystem.open(checkpointPath); + } else if (fileSystem.exists(backupCheckpointPath)) { + is = fileSystem.open(backupCheckpointPath); + } + if (is != null) { + ObjectInputStream objectInputStream = new ObjectInputStream(is); + NamedAggregators recoveredValue = + (NamedAggregators) objectInputStream.readObject(); + objectInputStream.close(); + LOG.info("Recovered accumulators from checkpoint: " + recoveredValue); + instance.setValue(recoveredValue); + } else { + LOG.info("No accumulator checkpoint found."); + } + } catch (Exception e) { + throw new RuntimeException("Failure while reading accumulator checkpoint.", e); + } + } + + private static void checkpoint() throws IOException { + if (checkpointPath != null) { + if (fileSystem.exists(checkpointPath)) { + if (fileSystem.exists(backupCheckpointPath)) { + fileSystem.delete(backupCheckpointPath, false); + } + fileSystem.rename(checkpointPath, backupCheckpointPath); + } + FSDataOutputStream os = fileSystem.create(tempCheckpointPath, true); + ObjectOutputStream oos = new ObjectOutputStream(os); + oos.writeObject(instance.value()); + oos.close(); + fileSystem.rename(tempCheckpointPath, checkpointPath); + } + } + + @VisibleForTesting + static void clear() { + synchronized (AggregatorsAccumulator.class) { + instance = null; + } + } + + /** + * Spark Listener which checkpoints {@link NamedAggregators} values for fault-tolerance. + */ + public static class AccumulatorCheckpointingSparkListener extends JavaStreamingListener { + @Override + public void onBatchCompleted(JavaStreamingListenerBatchCompleted batchCompleted) { + try { + checkpoint(); + } catch (IOException e) { + LOG.error("Failed to checkpoint accumulator singleton.", e); + } + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/31624fed/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java index c876c07..cf6c9ad 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java @@ -128,7 +128,7 @@ public class NamedAggregators implements Serializable { public String toString() { StringBuilder sb = new StringBuilder(); for (Map.Entry<String, State<?, ?, ?>> e : mNamedAggregators.entrySet()) { - sb.append(e.getKey()).append(": ").append(e.getValue().render()); + sb.append(e.getKey()).append(": ").append(e.getValue().render()).append(" "); } return sb.toString(); } http://git-wip-us.apache.org/repos/asf/beam/blob/31624fed/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/SparkAggregators.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/SparkAggregators.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/SparkAggregators.java index 245c69e..326acfe 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/SparkAggregators.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/SparkAggregators.java @@ -85,7 +85,7 @@ public class SparkAggregators { public static Accumulator<NamedAggregators> getOrCreateNamedAggregators( JavaSparkContext jsc, Optional<CheckpointDir> checkpointDir) { - return AccumulatorSingleton.getInstance(jsc, checkpointDir); + return AggregatorsAccumulator.getInstance(jsc, checkpointDir); } /** http://git-wip-us.apache.org/repos/asf/beam/blob/31624fed/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/AggregatorMetric.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/AggregatorMetric.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/AggregatorMetric.java deleted file mode 100644 index c07a069..0000000 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/AggregatorMetric.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.spark.aggregators.metrics; - -import com.codahale.metrics.Metric; - -import org.apache.beam.runners.spark.aggregators.NamedAggregators; - -/** - * An adapter between the {@link NamedAggregators} and codahale's {@link Metric} - * interface. - */ -public class AggregatorMetric implements Metric { - - private final NamedAggregators namedAggregators; - - private AggregatorMetric(final NamedAggregators namedAggregators) { - this.namedAggregators = namedAggregators; - } - - public static AggregatorMetric of(final NamedAggregators namedAggregators) { - return new AggregatorMetric(namedAggregators); - } - - NamedAggregators getNamedAggregators() { - return namedAggregators; - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/31624fed/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/AggregatorMetricSource.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/AggregatorMetricSource.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/AggregatorMetricSource.java deleted file mode 100644 index 2a00aec..0000000 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/AggregatorMetricSource.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.spark.aggregators.metrics; - -import com.codahale.metrics.MetricRegistry; - -import org.apache.beam.runners.spark.aggregators.NamedAggregators; -import org.apache.spark.metrics.source.Source; - -/** - * A Spark {@link Source} that is tailored to expose an {@link AggregatorMetric}, - * wrapping an underlying {@link NamedAggregators} instance. - */ -public class AggregatorMetricSource implements Source { - - private final String sourceName; - - private final MetricRegistry metricRegistry = new MetricRegistry(); - - public AggregatorMetricSource(final String appName, final NamedAggregators aggregators) { - sourceName = appName; - metricRegistry.register("Beam", AggregatorMetric.of(aggregators)); - } - - @Override - public String sourceName() { - return sourceName; - } - - @Override - public MetricRegistry metricRegistry() { - return metricRegistry; - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/31624fed/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/WithNamedAggregatorsSupport.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/WithNamedAggregatorsSupport.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/WithNamedAggregatorsSupport.java deleted file mode 100644 index 5e71280..0000000 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/WithNamedAggregatorsSupport.java +++ /dev/null @@ -1,174 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.spark.aggregators.metrics; - -import com.codahale.metrics.Counter; -import com.codahale.metrics.Gauge; -import com.codahale.metrics.Histogram; -import com.codahale.metrics.Meter; -import com.codahale.metrics.Metric; -import com.codahale.metrics.MetricFilter; -import com.codahale.metrics.MetricRegistry; -import com.codahale.metrics.Timer; - -import com.google.common.base.Function; -import com.google.common.base.Optional; -import com.google.common.base.Predicate; -import com.google.common.base.Predicates; -import com.google.common.collect.FluentIterable; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSortedMap; -import com.google.common.collect.Maps; -import com.google.common.collect.Ordering; - -import java.util.Map; -import java.util.SortedMap; - -import org.apache.beam.runners.spark.aggregators.NamedAggregators; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * A {@link MetricRegistry} decorator-like* that supports {@link AggregatorMetric} by exposing - * the underlying * {@link org.apache.beam.runners.spark.aggregators.NamedAggregators}' - * aggregators as {@link Gauge}s. - * <p> - * *{@link MetricRegistry} is not an interface, so this is not a by-the-book decorator. - * That said, it delegates all metric related getters to the "decorated" instance. - * </p> - */ -public class WithNamedAggregatorsSupport extends MetricRegistry { - - private static final Logger LOG = LoggerFactory.getLogger(WithNamedAggregatorsSupport.class); - - private MetricRegistry internalMetricRegistry; - - private WithNamedAggregatorsSupport(final MetricRegistry internalMetricRegistry) { - this.internalMetricRegistry = internalMetricRegistry; - } - - public static WithNamedAggregatorsSupport forRegistry(final MetricRegistry metricRegistry) { - return new WithNamedAggregatorsSupport(metricRegistry); - } - - @Override - public SortedMap<String, Timer> getTimers(final MetricFilter filter) { - return internalMetricRegistry.getTimers(filter); - } - - @Override - public SortedMap<String, Meter> getMeters(final MetricFilter filter) { - return internalMetricRegistry.getMeters(filter); - } - - @Override - public SortedMap<String, Histogram> getHistograms(final MetricFilter filter) { - return internalMetricRegistry.getHistograms(filter); - } - - @Override - public SortedMap<String, Counter> getCounters(final MetricFilter filter) { - return internalMetricRegistry.getCounters(filter); - } - - @Override - public SortedMap<String, Gauge> getGauges(final MetricFilter filter) { - return - new ImmutableSortedMap.Builder<String, Gauge>( - Ordering.from(String.CASE_INSENSITIVE_ORDER)) - .putAll(internalMetricRegistry.getGauges(filter)) - .putAll(extractGauges(internalMetricRegistry, filter)) - .build(); - } - - private Map<String, Gauge> extractGauges(final MetricRegistry metricRegistry, - final MetricFilter filter) { - - // find the AggregatorMetric metrics from within all currently registered metrics - final Optional<Map<String, Gauge>> gauges = - FluentIterable - .from(metricRegistry.getMetrics().entrySet()) - .firstMatch(isAggregatorMetric()) - .transform(toGauges()); - - return - gauges.isPresent() - ? Maps.filterEntries(gauges.get(), matches(filter)) - : ImmutableMap.<String, Gauge>of(); - } - - private Function<Map.Entry<String, Metric>, Map<String, Gauge>> toGauges() { - return new Function<Map.Entry<String, Metric>, Map<String, Gauge>>() { - @Override - public Map<String, Gauge> apply(final Map.Entry<String, Metric> entry) { - final NamedAggregators agg = ((AggregatorMetric) entry.getValue()).getNamedAggregators(); - final String parentName = entry.getKey(); - final Map<String, Gauge> gaugeMap = Maps.transformEntries(agg.renderAll(), toGauge()); - final Map<String, Gauge> fullNameGaugeMap = Maps.newLinkedHashMap(); - for (Map.Entry<String, Gauge> gaugeEntry : gaugeMap.entrySet()) { - fullNameGaugeMap.put(parentName + "." + gaugeEntry.getKey(), gaugeEntry.getValue()); - } - return Maps.filterValues(fullNameGaugeMap, Predicates.notNull()); - } - }; - } - - private Maps.EntryTransformer<String, Object, Gauge> toGauge() { - return new Maps.EntryTransformer<String, Object, Gauge>() { - - @Override - public Gauge transformEntry(final String name, final Object rawValue) { - return new Gauge<Double>() { - - @Override - public Double getValue() { - // at the moment the metric's type is assumed to be - // compatible with Double. While far from perfect, it seems reasonable at - // this point in time - try { - return Double.parseDouble(rawValue.toString()); - } catch (final Exception e) { - LOG.warn("Failed reporting metric with name [{}], of type [{}], since it could not be" - + " converted to double", name, rawValue.getClass().getSimpleName(), e); - return null; - } - } - }; - } - }; - } - - private Predicate<Map.Entry<String, Gauge>> matches(final MetricFilter filter) { - return new Predicate<Map.Entry<String, Gauge>>() { - @Override - public boolean apply(final Map.Entry<String, Gauge> entry) { - return filter.matches(entry.getKey(), entry.getValue()); - } - }; - } - - private Predicate<Map.Entry<String, Metric>> isAggregatorMetric() { - return new Predicate<Map.Entry<String, Metric>>() { - @Override - public boolean apply(final Map.Entry<String, Metric> metricEntry) { - return (metricEntry.getValue() instanceof AggregatorMetric); - } - }; - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/31624fed/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/sink/CsvSink.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/sink/CsvSink.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/sink/CsvSink.java deleted file mode 100644 index af1601a..0000000 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/sink/CsvSink.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.spark.aggregators.metrics.sink; - -import com.codahale.metrics.MetricRegistry; - -import java.util.Properties; - -import org.apache.beam.runners.spark.aggregators.metrics.AggregatorMetric; -import org.apache.beam.runners.spark.aggregators.metrics.WithNamedAggregatorsSupport; -import org.apache.spark.metrics.sink.Sink; - -/** - * A Spark {@link Sink} that is tailored to report {@link AggregatorMetric} metrics - * to a CSV file. - */ -public class CsvSink extends org.apache.spark.metrics.sink.CsvSink { - public CsvSink(final Properties properties, - final MetricRegistry metricRegistry, - final org.apache.spark.SecurityManager securityMgr) { - super(properties, WithNamedAggregatorsSupport.forRegistry(metricRegistry), securityMgr); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/31624fed/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/sink/GraphiteSink.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/sink/GraphiteSink.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/sink/GraphiteSink.java deleted file mode 100644 index 7a45ef7..0000000 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/sink/GraphiteSink.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.spark.aggregators.metrics.sink; - -import com.codahale.metrics.MetricRegistry; - -import java.util.Properties; - -import org.apache.beam.runners.spark.aggregators.metrics.AggregatorMetric; -import org.apache.beam.runners.spark.aggregators.metrics.WithNamedAggregatorsSupport; -import org.apache.spark.metrics.sink.Sink; - -/** - * A Spark {@link Sink} that is tailored to report {@link AggregatorMetric} metrics - * to Graphite. - */ -public class GraphiteSink extends org.apache.spark.metrics.sink.GraphiteSink { - public GraphiteSink(final Properties properties, - final MetricRegistry metricRegistry, - final org.apache.spark.SecurityManager securityMgr) { - super(properties, WithNamedAggregatorsSupport.forRegistry(metricRegistry), securityMgr); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/31624fed/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/sink/package-info.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/sink/package-info.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/sink/package-info.java deleted file mode 100644 index 2e6dd0d..0000000 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/sink/package-info.java +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Spark sinks that support - * the {@link org.apache.beam.runners.spark.aggregators.metrics.AggregatorMetric}. - */ -package org.apache.beam.runners.spark.aggregators.metrics.sink; http://git-wip-us.apache.org/repos/asf/beam/blob/31624fed/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/AggregatorMetric.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/AggregatorMetric.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/AggregatorMetric.java new file mode 100644 index 0000000..271cc6b --- /dev/null +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/AggregatorMetric.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.spark.metrics; + +import com.codahale.metrics.Metric; + +import org.apache.beam.runners.spark.aggregators.NamedAggregators; + +/** + * An adapter between the {@link NamedAggregators} and Codahale's {@link Metric} interface. + */ +public class AggregatorMetric implements Metric { + + private final NamedAggregators namedAggregators; + + private AggregatorMetric(final NamedAggregators namedAggregators) { + this.namedAggregators = namedAggregators; + } + + public static AggregatorMetric of(final NamedAggregators namedAggregators) { + return new AggregatorMetric(namedAggregators); + } + + NamedAggregators getNamedAggregators() { + return namedAggregators; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/31624fed/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/AggregatorMetricSource.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/AggregatorMetricSource.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/AggregatorMetricSource.java new file mode 100644 index 0000000..b3880e8 --- /dev/null +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/AggregatorMetricSource.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.spark.metrics; + +import com.codahale.metrics.MetricRegistry; +import org.apache.beam.runners.spark.aggregators.NamedAggregators; +import org.apache.spark.metrics.source.Source; + + +/** + * A Spark {@link Source} that is tailored to expose an {@link AggregatorMetric}, + * wrapping an underlying {@link NamedAggregators} instance. + */ +public class AggregatorMetricSource implements Source { + + private final String sourceName; + + private final MetricRegistry metricRegistry = new MetricRegistry(); + + public AggregatorMetricSource(final String appName, final NamedAggregators aggregators) { + sourceName = appName; + metricRegistry.register("Beam.Aggregators", AggregatorMetric.of(aggregators)); + } + + @Override + public String sourceName() { + return sourceName; + } + + @Override + public MetricRegistry metricRegistry() { + return metricRegistry; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/31624fed/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/CompositeSource.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/CompositeSource.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/CompositeSource.java new file mode 100644 index 0000000..1fb7a17 --- /dev/null +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/CompositeSource.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.spark.metrics; + +import com.codahale.metrics.MetricRegistry; +import org.apache.spark.metrics.source.Source; + + +/** + * Composite source made up of several {@link MetricRegistry} instances. + */ +public class CompositeSource implements Source { + private final String name; + private final MetricRegistry metricRegistry; + + public CompositeSource(final String name, MetricRegistry... metricRegistries) { + this.name = name; + this.metricRegistry = new MetricRegistry(); + for (MetricRegistry metricRegistry : metricRegistries) { + this.metricRegistry.registerAll(metricRegistry); + } + } + + @Override + public String sourceName() { + return name; + } + + @Override + public MetricRegistry metricRegistry() { + return metricRegistry; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/31624fed/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsAccumulator.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsAccumulator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsAccumulator.java index b8f0094..effcbe9 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsAccumulator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsAccumulator.java @@ -27,15 +27,12 @@ import org.apache.spark.api.java.JavaSparkContext; * For resilience, {@link Accumulator Accumulators} are required to be wrapped in a Singleton. * @see <a href="https://spark.apache.org/docs/1.6.3/streaming-programming-guide.html#accumulators-and-broadcast-variables">accumulators</a> */ -class MetricsAccumulator { +public class MetricsAccumulator { private static volatile Accumulator<SparkMetricsContainer> instance = null; - static Accumulator<SparkMetricsContainer> getInstance(JavaSparkContext jsc) { + public static Accumulator<SparkMetricsContainer> getOrCreateInstance(JavaSparkContext jsc) { if (instance == null) { - if (jsc == null) { - throw new IllegalStateException("Metrics accumulator has not been instantiated"); - } synchronized (MetricsAccumulator.class) { if (instance == null) { // TODO: currently when recovering from checkpoint, Spark does not recover the @@ -50,6 +47,14 @@ class MetricsAccumulator { return instance; } + static Accumulator<SparkMetricsContainer> getInstance() { + if (instance == null) { + throw new IllegalStateException("Metrics accumulator has not been instantiated"); + } else { + return instance; + } + } + @SuppressWarnings("unused") @VisibleForTesting static void clear() { http://git-wip-us.apache.org/repos/asf/beam/blob/31624fed/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsAccumulatorParam.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsAccumulatorParam.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsAccumulatorParam.java index 032e283..cd54097 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsAccumulatorParam.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsAccumulatorParam.java @@ -37,6 +37,6 @@ class MetricsAccumulatorParam implements AccumulatorParam<SparkMetricsContainer> @Override public SparkMetricsContainer zero(SparkMetricsContainer initialValue) { - return initialValue; + return new SparkMetricsContainer(); } } http://git-wip-us.apache.org/repos/asf/beam/blob/31624fed/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkBeamMetric.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkBeamMetric.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkBeamMetric.java new file mode 100644 index 0000000..0c656d7 --- /dev/null +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkBeamMetric.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.spark.metrics; + +import com.codahale.metrics.Metric; +import java.util.HashMap; +import java.util.Map; +import org.apache.beam.sdk.metrics.DistributionResult; +import org.apache.beam.sdk.metrics.MetricName; +import org.apache.beam.sdk.metrics.MetricQueryResults; +import org.apache.beam.sdk.metrics.MetricResult; +import org.apache.beam.sdk.metrics.MetricsFilter; + + +/** + * An adapter between the {@link SparkMetricsContainer} and Codahale's {@link Metric} interface. + */ +class SparkBeamMetric implements Metric { + private static final String ILLEGAL_CHARACTERS = "[^A-Za-z0-9\\._-]"; + + private final SparkMetricResults metricResults = new SparkMetricResults(); + + Map<String, ?> renderAll() { + Map<String, Object> metrics = new HashMap<>(); + MetricQueryResults metricQueryResults = + metricResults.queryMetrics(MetricsFilter.builder().build()); + for (MetricResult<Long> metricResult : metricQueryResults.counters()) { + metrics.put(renderName(metricResult), metricResult.committed()); + } + for (MetricResult<DistributionResult> metricResult : metricQueryResults.distributions()) { + DistributionResult result = metricResult.committed(); + metrics.put(renderName(metricResult) + ".count", result.count()); + metrics.put(renderName(metricResult) + ".sum", result.sum()); + metrics.put(renderName(metricResult) + ".min", result.min()); + metrics.put(renderName(metricResult) + ".max", result.max()); + metrics.put(renderName(metricResult) + ".mean", result.mean()); + } + return metrics; + } + + private String renderName(MetricResult<?> metricResult) { + MetricName metricName = metricResult.name(); + String rendered = metricResult.step() + "." + metricName.namespace() + "." + metricName.name(); + return rendered.replaceAll(ILLEGAL_CHARACTERS, "_"); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/31624fed/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkBeamMetricSource.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkBeamMetricSource.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkBeamMetricSource.java new file mode 100644 index 0000000..24231c3 --- /dev/null +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkBeamMetricSource.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.spark.metrics; + +import com.codahale.metrics.MetricRegistry; + +import org.apache.spark.metrics.source.Source; + + +/** + * A Spark {@link Source} that is tailored to expose a {@link SparkBeamMetric}, + * wrapping an underlying {@link SparkMetricsContainer} instance. + */ +public class SparkBeamMetricSource implements Source { + + private final String sourceName; + + private final MetricRegistry metricRegistry = new MetricRegistry(); + + public SparkBeamMetricSource(final String appName) { + sourceName = appName; + metricRegistry.register("Beam.Metrics", new SparkBeamMetric()); + } + + @Override + public String sourceName() { + return sourceName; + } + + @Override + public MetricRegistry metricRegistry() { + return metricRegistry; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/31624fed/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricResults.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricResults.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricResults.java index aea7b2e..64b92b7 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricResults.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricResults.java @@ -33,25 +33,19 @@ import org.apache.beam.sdk.metrics.MetricQueryResults; import org.apache.beam.sdk.metrics.MetricResult; import org.apache.beam.sdk.metrics.MetricResults; import org.apache.beam.sdk.metrics.MetricsFilter; -import org.apache.spark.Accumulator; /** * Implementation of {@link MetricResults} for the Spark Runner. */ public class SparkMetricResults extends MetricResults { - private final Accumulator<SparkMetricsContainer> metricsAccum; - - public SparkMetricResults(Accumulator<SparkMetricsContainer> metricsAccum) { - this.metricsAccum = metricsAccum; - } @Override public MetricQueryResults queryMetrics(MetricsFilter filter) { return new SparkMetricQueryResults(filter); } - private class SparkMetricQueryResults implements MetricQueryResults { + private static class SparkMetricQueryResults implements MetricQueryResults { private final MetricsFilter filter; SparkMetricQueryResults(MetricsFilter filter) { @@ -62,7 +56,7 @@ public class SparkMetricResults extends MetricResults { public Iterable<MetricResult<Long>> counters() { return FluentIterable - .from(metricsAccum.value().getCounters()) + .from(SparkMetricsContainer.getCounters()) .filter(matchesFilter(filter)) .transform(TO_COUNTER_RESULT) .toList(); @@ -72,7 +66,7 @@ public class SparkMetricResults extends MetricResults { public Iterable<MetricResult<DistributionResult>> distributions() { return FluentIterable - .from(metricsAccum.value().getDistributions()) + .from(SparkMetricsContainer.getDistributions()) .filter(matchesFilter(filter)) .transform(TO_DISTRIBUTION_RESULT) .toList(); http://git-wip-us.apache.org/repos/asf/beam/blob/31624fed/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricsContainer.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricsContainer.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricsContainer.java index 0bf9612..234cb81 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricsContainer.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricsContainer.java @@ -41,8 +41,6 @@ import org.apache.beam.sdk.metrics.MetricName; import org.apache.beam.sdk.metrics.MetricUpdates; import org.apache.beam.sdk.metrics.MetricUpdates.MetricUpdate; import org.apache.beam.sdk.metrics.MetricsContainer; -import org.apache.spark.Accumulator; -import org.apache.spark.api.java.JavaSparkContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,12 +55,6 @@ public class SparkMetricsContainer implements Serializable { private final Map<MetricKey, MetricAggregator<?>> metrics = new HashMap<>(); - SparkMetricsContainer() {} - - public static Accumulator<SparkMetricsContainer> getAccumulator(JavaSparkContext jsc) { - return MetricsAccumulator.getInstance(jsc); - } - public MetricsContainer getContainer(String stepName) { if (metricsContainers == null) { synchronized (this) { @@ -80,10 +72,10 @@ public class SparkMetricsContainer implements Serializable { } } - Collection<CounterAggregator> getCounters() { + static Collection<CounterAggregator> getCounters() { return FluentIterable - .from(metrics.values()) + .from(getInstance().metrics.values()) .filter(IS_COUNTER) .transform(TO_COUNTER) .toList(); @@ -106,10 +98,10 @@ public class SparkMetricsContainer implements Serializable { } }; - Collection<DistributionAggregator> getDistributions() { + static Collection<DistributionAggregator> getDistributions() { return FluentIterable - .from(metrics.values()) + .from(getInstance().metrics.values()) .filter(IS_DISTRIBUTION) .transform(TO_DISTRIBUTION) .toList(); @@ -132,10 +124,11 @@ public class SparkMetricsContainer implements Serializable { }; SparkMetricsContainer merge(SparkMetricsContainer other) { - return - new SparkMetricsContainer() - .updated(this.getAggregators()) - .updated(other.getAggregators()); + return this.updated(other.getAggregators()); + } + + private static SparkMetricsContainer getInstance() { + return MetricsAccumulator.getInstance().value(); } private Collection<MetricAggregator<?>> getAggregators() { @@ -143,6 +136,10 @@ public class SparkMetricsContainer implements Serializable { } private void writeObject(ObjectOutputStream out) throws IOException { + // Since MetricsContainer instances are not serializable, materialize a serializable map of + // MetricsAggregators relating to the same metrics. This is done here, when Spark serializes + // the SparkMetricsContainer accumulator before sending results back to the driver at a point in + // time where all the metrics updates have already been made to the MetricsContainers. materialize(); out.defaultWriteObject(); } @@ -285,4 +282,13 @@ public class SparkMetricsContainer implements Serializable { return h; } } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + for (Map.Entry<String, ?> metric : new SparkBeamMetric().renderAll().entrySet()) { + sb.append(metric.getKey()).append(": ").append(metric.getValue()).append(" "); + } + return sb.toString(); + } } http://git-wip-us.apache.org/repos/asf/beam/blob/31624fed/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/WithMetricsSupport.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/WithMetricsSupport.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/WithMetricsSupport.java new file mode 100644 index 0000000..ff5fc34 --- /dev/null +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/WithMetricsSupport.java @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.spark.metrics; + +import com.codahale.metrics.Counter; +import com.codahale.metrics.Gauge; +import com.codahale.metrics.Histogram; +import com.codahale.metrics.Meter; +import com.codahale.metrics.Metric; +import com.codahale.metrics.MetricFilter; +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.Timer; +import com.google.common.base.Function; +import com.google.common.base.Optional; +import com.google.common.base.Predicate; +import com.google.common.base.Predicates; +import com.google.common.collect.FluentIterable; +import com.google.common.collect.ImmutableSortedMap; +import com.google.common.collect.Maps; +import com.google.common.collect.Ordering; +import java.util.HashMap; +import java.util.Map; +import java.util.SortedMap; +import org.apache.beam.runners.spark.aggregators.NamedAggregators; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * A {@link MetricRegistry} decorator-like that supports {@link AggregatorMetric} and + * {@link SparkBeamMetric} as {@link Gauge Gauges}. + * <p> + * {@link MetricRegistry} is not an interface, so this is not a by-the-book decorator. + * That said, it delegates all metric related getters to the "decorated" instance. + * </p> + */ +public class WithMetricsSupport extends MetricRegistry { + + private static final Logger LOG = LoggerFactory.getLogger(WithMetricsSupport.class); + + private final MetricRegistry internalMetricRegistry; + + private WithMetricsSupport(final MetricRegistry internalMetricRegistry) { + this.internalMetricRegistry = internalMetricRegistry; + } + + public static WithMetricsSupport forRegistry(final MetricRegistry metricRegistry) { + return new WithMetricsSupport(metricRegistry); + } + + @Override + public SortedMap<String, Timer> getTimers(final MetricFilter filter) { + return internalMetricRegistry.getTimers(filter); + } + + @Override + public SortedMap<String, Meter> getMeters(final MetricFilter filter) { + return internalMetricRegistry.getMeters(filter); + } + + @Override + public SortedMap<String, Histogram> getHistograms(final MetricFilter filter) { + return internalMetricRegistry.getHistograms(filter); + } + + @Override + public SortedMap<String, Counter> getCounters(final MetricFilter filter) { + return internalMetricRegistry.getCounters(filter); + } + + @Override + public SortedMap<String, Gauge> getGauges(final MetricFilter filter) { + return + new ImmutableSortedMap.Builder<String, Gauge>( + Ordering.from(String.CASE_INSENSITIVE_ORDER)) + .putAll(internalMetricRegistry.getGauges(filter)) + .putAll(extractGauges(internalMetricRegistry, filter)) + .build(); + } + + private Map<String, Gauge> extractGauges(final MetricRegistry metricRegistry, + final MetricFilter filter) { + Map<String, Gauge> gauges = new HashMap<>(); + + // find the AggregatorMetric metrics from within all currently registered metrics + final Optional<Map<String, Gauge>> aggregatorMetrics = + FluentIterable + .from(metricRegistry.getMetrics().entrySet()) + .firstMatch(isAggregatorMetric()) + .transform(aggregatorMetricToGauges()); + + // find the SparkBeamMetric metrics from within all currently registered metrics + final Optional<Map<String, Gauge>> beamMetrics = + FluentIterable + .from(metricRegistry.getMetrics().entrySet()) + .firstMatch(isSparkBeamMetric()) + .transform(beamMetricToGauges()); + + if (aggregatorMetrics.isPresent()) { + gauges.putAll(Maps.filterEntries(aggregatorMetrics.get(), matches(filter))); + } + + if (beamMetrics.isPresent()) { + gauges.putAll(Maps.filterEntries(beamMetrics.get(), matches(filter))); + } + + return gauges; + } + + private Function<Map.Entry<String, Metric>, Map<String, Gauge>> aggregatorMetricToGauges() { + return new Function<Map.Entry<String, Metric>, Map<String, Gauge>>() { + @Override + public Map<String, Gauge> apply(final Map.Entry<String, Metric> entry) { + final NamedAggregators agg = ((AggregatorMetric) entry.getValue()).getNamedAggregators(); + final String parentName = entry.getKey(); + final Map<String, Gauge> gaugeMap = Maps.transformEntries(agg.renderAll(), toGauge()); + final Map<String, Gauge> fullNameGaugeMap = Maps.newLinkedHashMap(); + for (Map.Entry<String, Gauge> gaugeEntry : gaugeMap.entrySet()) { + fullNameGaugeMap.put(parentName + "." + gaugeEntry.getKey(), gaugeEntry.getValue()); + } + return Maps.filterValues(fullNameGaugeMap, Predicates.notNull()); + } + }; + } + + private Function<Map.Entry<String, Metric>, Map<String, Gauge>> beamMetricToGauges() { + return new Function<Map.Entry<String, Metric>, Map<String, Gauge>>() { + @Override + public Map<String, Gauge> apply(final Map.Entry<String, Metric> entry) { + final Map<String, ?> metrics = ((SparkBeamMetric) entry.getValue()).renderAll(); + final String parentName = entry.getKey(); + final Map<String, Gauge> gaugeMap = Maps.transformEntries(metrics, toGauge()); + final Map<String, Gauge> fullNameGaugeMap = Maps.newLinkedHashMap(); + for (Map.Entry<String, Gauge> gaugeEntry : gaugeMap.entrySet()) { + fullNameGaugeMap.put(parentName + "." + gaugeEntry.getKey(), gaugeEntry.getValue()); + } + return Maps.filterValues(fullNameGaugeMap, Predicates.notNull()); + } + }; + } + + private Maps.EntryTransformer<String, Object, Gauge> toGauge() { + return new Maps.EntryTransformer<String, Object, Gauge>() { + + @Override + public Gauge transformEntry(final String name, final Object rawValue) { + return new Gauge<Double>() { + + @Override + public Double getValue() { + // at the moment the metric's type is assumed to be + // compatible with Double. While far from perfect, it seems reasonable at + // this point in time + try { + return Double.parseDouble(rawValue.toString()); + } catch (final Exception e) { + LOG.warn("Failed reporting metric with name [{}], of type [{}], since it could not be" + + " converted to double", name, rawValue.getClass().getSimpleName(), e); + return null; + } + } + }; + } + }; + } + + private Predicate<Map.Entry<String, Gauge>> matches(final MetricFilter filter) { + return new Predicate<Map.Entry<String, Gauge>>() { + @Override + public boolean apply(final Map.Entry<String, Gauge> entry) { + return filter.matches(entry.getKey(), entry.getValue()); + } + }; + } + + private Predicate<Map.Entry<String, Metric>> isAggregatorMetric() { + return new Predicate<Map.Entry<String, Metric>>() { + @Override + public boolean apply(final Map.Entry<String, Metric> metricEntry) { + return (metricEntry.getValue() instanceof AggregatorMetric); + } + }; + } + + private Predicate<Map.Entry<String, Metric>> isSparkBeamMetric() { + return new Predicate<Map.Entry<String, Metric>>() { + @Override + public boolean apply(final Map.Entry<String, Metric> metricEntry) { + return (metricEntry.getValue() instanceof SparkBeamMetric); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/31624fed/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/sink/CsvSink.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/sink/CsvSink.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/sink/CsvSink.java new file mode 100644 index 0000000..131aa43 --- /dev/null +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/sink/CsvSink.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.spark.metrics.sink; + +import com.codahale.metrics.MetricRegistry; +import java.util.Properties; +import org.apache.beam.runners.spark.metrics.AggregatorMetric; +import org.apache.beam.runners.spark.metrics.WithMetricsSupport; +import org.apache.spark.metrics.sink.Sink; + + +/** + * A Spark {@link Sink} that is tailored to report {@link AggregatorMetric} metrics + * to a CSV file. + */ +public class CsvSink extends org.apache.spark.metrics.sink.CsvSink { + public CsvSink(final Properties properties, + final MetricRegistry metricRegistry, + final org.apache.spark.SecurityManager securityMgr) { + super(properties, WithMetricsSupport.forRegistry(metricRegistry), securityMgr); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/31624fed/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/sink/GraphiteSink.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/sink/GraphiteSink.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/sink/GraphiteSink.java new file mode 100644 index 0000000..d496306 --- /dev/null +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/sink/GraphiteSink.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.spark.metrics.sink; + +import com.codahale.metrics.MetricRegistry; +import java.util.Properties; +import org.apache.beam.runners.spark.metrics.AggregatorMetric; +import org.apache.beam.runners.spark.metrics.WithMetricsSupport; +import org.apache.spark.metrics.sink.Sink; + + +/** + * A Spark {@link Sink} that is tailored to report {@link AggregatorMetric} metrics + * to Graphite. + */ +public class GraphiteSink extends org.apache.spark.metrics.sink.GraphiteSink { + public GraphiteSink(final Properties properties, + final MetricRegistry metricRegistry, + final org.apache.spark.SecurityManager securityMgr) { + super(properties, WithMetricsSupport.forRegistry(metricRegistry), securityMgr); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/31624fed/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/sink/package-info.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/sink/package-info.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/sink/package-info.java new file mode 100644 index 0000000..ce73d9a --- /dev/null +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/sink/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Spark sinks that supports beam metrics and aggregators. + */ +package org.apache.beam.runners.spark.metrics.sink; http://git-wip-us.apache.org/repos/asf/beam/blob/31624fed/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnRunnerWithMetrics.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnRunnerWithMetrics.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnRunnerWithMetrics.java index d9366ca..fa9a9c2 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnRunnerWithMetrics.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnRunnerWithMetrics.java @@ -39,8 +39,10 @@ class DoFnRunnerWithMetrics<InputT, OutputT> implements DoFnRunner<InputT, Outpu private final String stepName; private final Accumulator<SparkMetricsContainer> metricsAccum; - DoFnRunnerWithMetrics(String stepName, DoFnRunner<InputT, OutputT> delegate, - Accumulator<SparkMetricsContainer>metricsAccum) { + DoFnRunnerWithMetrics( + String stepName, + DoFnRunner<InputT, OutputT> delegate, + Accumulator<SparkMetricsContainer> metricsAccum) { this.delegate = delegate; this.stepName = stepName; this.metricsAccum = metricsAccum; @@ -48,51 +50,42 @@ class DoFnRunnerWithMetrics<InputT, OutputT> implements DoFnRunner<InputT, Outpu @Override public void startBundle() { - doWithMetricsContainer(new Runnable() { - @Override - public void run() { - delegate.startBundle(); - } - }); + try (Closeable ignored = MetricsEnvironment.scopedMetricsContainer(metricsContainer())) { + delegate.startBundle(); + } catch (IOException e) { + throw new RuntimeException(e); + } } @Override public void processElement(final WindowedValue<InputT> elem) { - doWithMetricsContainer(new Runnable() { - @Override - public void run() { - delegate.processElement(elem); - } - }); + try (Closeable ignored = MetricsEnvironment.scopedMetricsContainer(metricsContainer())) { + delegate.processElement(elem); + } catch (IOException e) { + throw new RuntimeException(e); + } } @Override public void onTimer(final String timerId, final BoundedWindow window, final Instant timestamp, final TimeDomain timeDomain) { - doWithMetricsContainer(new Runnable() { - @Override - public void run() { - delegate.onTimer(timerId, window, timestamp, timeDomain); - } - }); + try (Closeable ignored = MetricsEnvironment.scopedMetricsContainer(metricsContainer())) { + delegate.onTimer(timerId, window, timestamp, timeDomain); + } catch (IOException e) { + throw new RuntimeException(e); + } } @Override public void finishBundle() { - doWithMetricsContainer(new Runnable() { - @Override - public void run() { - delegate.finishBundle(); - } - }); - } - - private void doWithMetricsContainer(Runnable runnable) { - MetricsContainer metricsContainer = metricsAccum.localValue().getContainer(stepName); - try (Closeable ignored = MetricsEnvironment.scopedMetricsContainer(metricsContainer)) { - runnable.run(); + try (Closeable ignored = MetricsEnvironment.scopedMetricsContainer(metricsContainer())) { + delegate.finishBundle(); } catch (IOException e) { throw new RuntimeException(e); } } + + private MetricsContainer metricsContainer() { + return metricsAccum.localValue().getContainer(stepName); + } } http://git-wip-us.apache.org/repos/asf/beam/blob/31624fed/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java index bd26ba1..326838a 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java @@ -33,8 +33,6 @@ import org.slf4j.LoggerFactory; public final class SparkContextFactory { private static final Logger LOG = LoggerFactory.getLogger(SparkContextFactory.class); - public static final JavaSparkContext EMPTY_CONTEXT = null; - /** * If the property {@code beam.spark.test.reuseSparkContext} is set to * {@code true} then the Spark context will be reused for beam pipelines. http://git-wip-us.apache.org/repos/asf/beam/blob/31624fed/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java index 3d75142..7f4b708 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java @@ -42,6 +42,7 @@ import org.apache.beam.runners.spark.io.hadoop.HadoopIO; import org.apache.beam.runners.spark.io.hadoop.ShardNameTemplateHelper; import org.apache.beam.runners.spark.io.hadoop.TemplatedAvroKeyOutputFormat; import org.apache.beam.runners.spark.io.hadoop.TemplatedTextOutputFormat; +import org.apache.beam.runners.spark.metrics.MetricsAccumulator; import org.apache.beam.runners.spark.metrics.SparkMetricsContainer; import org.apache.beam.runners.spark.util.SideInputBroadcast; import org.apache.beam.sdk.coders.Coder; @@ -253,7 +254,7 @@ public final class TransformTranslator { Accumulator<NamedAggregators> aggAccum = SparkAggregators.getNamedAggregators(jsc); Accumulator<SparkMetricsContainer> metricsAccum = - SparkMetricsContainer.getAccumulator(jsc); + MetricsAccumulator.getOrCreateInstance(jsc); Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs = TranslationUtils.getSideInputs(transform.getSideInputs(), context); context.putDataset(transform, http://git-wip-us.apache.org/repos/asf/beam/blob/31624fed/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java index c9ab2b3..2bfd172 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java @@ -30,6 +30,7 @@ import org.apache.beam.runners.spark.aggregators.SparkAggregators; import org.apache.beam.runners.spark.io.ConsoleIO; import org.apache.beam.runners.spark.io.CreateStream; import org.apache.beam.runners.spark.io.SparkUnboundedSource; +import org.apache.beam.runners.spark.metrics.MetricsAccumulator; import org.apache.beam.runners.spark.metrics.SparkMetricsContainer; import org.apache.beam.runners.spark.translation.BoundedDataset; import org.apache.beam.runners.spark.translation.Dataset; @@ -396,7 +397,7 @@ final class StreamingTransformTranslator { final Accumulator<NamedAggregators> aggAccum = SparkAggregators.getNamedAggregators(jsc); final Accumulator<SparkMetricsContainer> metricsAccum = - SparkMetricsContainer.getAccumulator(jsc); + MetricsAccumulator.getOrCreateInstance(jsc); final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs = TranslationUtils.getSideInputs(transform.getSideInputs(), jsc, pviews); http://git-wip-us.apache.org/repos/asf/beam/blob/31624fed/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/ClearAggregatorsRule.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/ClearAggregatorsRule.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/ClearAggregatorsRule.java index 4e91d15..0b31acc 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/ClearAggregatorsRule.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/ClearAggregatorsRule.java @@ -20,8 +20,9 @@ package org.apache.beam.runners.spark.aggregators; import org.junit.rules.ExternalResource; + /** - * A rule that clears the {@link org.apache.beam.runners.spark.aggregators.AccumulatorSingleton} + * A rule that clears the {@link AggregatorsAccumulator} * which represents the Beam {@link org.apache.beam.sdk.transforms.Aggregator}s. */ public class ClearAggregatorsRule extends ExternalResource { @@ -32,6 +33,6 @@ public class ClearAggregatorsRule extends ExternalResource { } public void clearNamedAggregators() { - AccumulatorSingleton.clear(); + AggregatorsAccumulator.clear(); } } http://git-wip-us.apache.org/repos/asf/beam/blob/31624fed/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/InMemoryMetrics.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/InMemoryMetrics.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/InMemoryMetrics.java index 389cd03..f6e16ae 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/InMemoryMetrics.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/InMemoryMetrics.java @@ -23,7 +23,7 @@ import com.codahale.metrics.MetricRegistry; import com.google.common.base.Predicates; import com.google.common.collect.Iterables; import java.util.Properties; -import org.apache.beam.runners.spark.aggregators.metrics.WithNamedAggregatorsSupport; +import org.apache.beam.runners.spark.metrics.WithMetricsSupport; import org.apache.spark.metrics.sink.Sink; @@ -32,17 +32,18 @@ import org.apache.spark.metrics.sink.Sink; */ public class InMemoryMetrics implements Sink { - private static WithNamedAggregatorsSupport extendedMetricsRegistry; + private static WithMetricsSupport extendedMetricsRegistry; private static MetricRegistry internalMetricRegistry; + @SuppressWarnings("UnusedParameters") public InMemoryMetrics(final Properties properties, final MetricRegistry metricRegistry, final org.apache.spark.SecurityManager securityMgr) { - extendedMetricsRegistry = WithNamedAggregatorsSupport.forRegistry(metricRegistry); + extendedMetricsRegistry = WithMetricsSupport.forRegistry(metricRegistry); internalMetricRegistry = metricRegistry; } - @SuppressWarnings("unchecked") + @SuppressWarnings({"unchecked", "WeakerAccess"}) public static <T> T valueOf(final String name) { final T retVal; @@ -62,6 +63,7 @@ public class InMemoryMetrics implements Sink { return retVal; } + @SuppressWarnings("WeakerAccess") public static void clearAll() { if (internalMetricRegistry != null) { internalMetricRegistry.removeMatching(MetricFilter.ALL);