Repository: beam Updated Branches: refs/heads/master d16715309 -> b6ca062fc
[BEAM-1629] Init metrics/aggregators accumulators before traversing pipeline Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/874c8d0d Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/874c8d0d Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/874c8d0d Branch: refs/heads/master Commit: 874c8d0da65568b01cd5f184e303d39c7810a8bf Parents: d167153 Author: Aviem Zur <aviem...@gmail.com> Authored: Mon Mar 6 20:48:48 2017 +0200 Committer: Stas Levin <stasle...@apache.org> Committed: Sun Mar 12 10:02:23 2017 +0200 ---------------------------------------------------------------------- .../spark/SparkNativePipelineVisitor.java | 4 -- .../beam/runners/spark/SparkPipelineResult.java | 8 +-- .../apache/beam/runners/spark/SparkRunner.java | 65 ++++++++++---------- .../beam/runners/spark/SparkRunnerDebugger.java | 30 ++++++--- .../beam/runners/spark/TestSparkRunner.java | 4 +- .../aggregators/AggregatorsAccumulator.java | 44 +++++++++---- .../spark/aggregators/SparkAggregators.java | 40 ++---------- .../spark/metrics/AggregatorMetricSource.java | 11 ++-- .../spark/metrics/MetricsAccumulator.java | 38 ++++++++---- .../spark/metrics/SparkBeamMetricSource.java | 11 ++-- .../spark/metrics/SparkMetricsContainer.java | 17 ++--- .../spark/translation/TransformTranslator.java | 13 ++-- .../SparkRunnerStreamingContextFactory.java | 3 + .../streaming/StreamingTransformTranslator.java | 10 +-- .../metrics/sink/NamedAggregatorsTest.java | 15 +---- .../ResumeFromCheckpointStreamingTest.java | 4 +- 16 files changed, 156 insertions(+), 161 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/874c8d0d/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkNativePipelineVisitor.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkNativePipelineVisitor.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkNativePipelineVisitor.java index 056da97..c2784a2 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkNativePipelineVisitor.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkNativePipelineVisitor.java @@ -19,7 +19,6 @@ package org.apache.beam.runners.spark; import com.google.common.base.Joiner; -import com.google.common.base.Optional; import com.google.common.base.Predicate; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; @@ -27,11 +26,9 @@ import java.lang.reflect.Field; import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; import java.util.List; -import org.apache.beam.runners.spark.metrics.MetricsAccumulator; import org.apache.beam.runners.spark.translation.EvaluationContext; import org.apache.beam.runners.spark.translation.SparkPipelineTranslator; import org.apache.beam.runners.spark.translation.TransformEvaluator; -import org.apache.beam.runners.spark.translation.streaming.Checkpoint; import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.runners.TransformHierarchy; import org.apache.beam.sdk.transforms.MapElements; @@ -55,7 +52,6 @@ public class SparkNativePipelineVisitor extends SparkRunner.Evaluator { SparkNativePipelineVisitor(SparkPipelineTranslator translator, EvaluationContext ctxt) { super(translator, ctxt); this.transforms = new ArrayList<>(); - MetricsAccumulator.init(ctxt.getSparkContext(), Optional.<Checkpoint.CheckpointDir>absent()); } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/874c8d0d/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java index ddc1964..ed1e0c8 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java @@ -27,7 +27,6 @@ import java.util.concurrent.TimeoutException; import org.apache.beam.runners.spark.aggregators.SparkAggregators; import org.apache.beam.runners.spark.metrics.SparkMetricResults; import org.apache.beam.runners.spark.translation.SparkContextFactory; -import org.apache.beam.sdk.AggregatorRetrievalException; import org.apache.beam.sdk.AggregatorValues; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; @@ -84,13 +83,12 @@ public abstract class SparkPipelineResult implements PipelineResult { throws TimeoutException, ExecutionException, InterruptedException; public <T> T getAggregatorValue(final String name, final Class<T> resultType) { - return SparkAggregators.valueOf(name, resultType, javaSparkContext); + return SparkAggregators.valueOf(name, resultType); } @Override - public <T> AggregatorValues<T> getAggregatorValues(final Aggregator<?, T> aggregator) - throws AggregatorRetrievalException { - return SparkAggregators.valueOf(aggregator, javaSparkContext); + public <T> AggregatorValues<T> getAggregatorValues(final Aggregator<?, T> aggregator) { + return SparkAggregators.valueOf(aggregator); } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/874c8d0d/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java index a706f00..de648fc 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java @@ -18,7 +18,6 @@ package org.apache.beam.runners.spark; -import com.google.common.base.Optional; import com.google.common.collect.Iterables; import java.util.Arrays; import java.util.Collection; @@ -27,8 +26,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import org.apache.beam.runners.spark.aggregators.AggregatorsAccumulator; -import org.apache.beam.runners.spark.aggregators.NamedAggregators; -import org.apache.beam.runners.spark.aggregators.SparkAggregators; import org.apache.beam.runners.spark.io.CreateStream; import org.apache.beam.runners.spark.metrics.AggregatorMetricSource; import org.apache.beam.runners.spark.metrics.CompositeSource; @@ -59,7 +56,6 @@ import org.apache.beam.sdk.values.PInput; import org.apache.beam.sdk.values.POutput; import org.apache.beam.sdk.values.PValue; import org.apache.beam.sdk.values.TaggedPValue; -import org.apache.spark.Accumulator; import org.apache.spark.SparkEnv$; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.metrics.MetricsSystem; @@ -141,31 +137,6 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> { mOptions = options; } - private void registerMetrics(final SparkPipelineOptions opts, final JavaSparkContext jsc) { - Optional<CheckpointDir> maybeCheckpointDir = - opts.isStreaming() ? Optional.of(new CheckpointDir(opts.getCheckpointDir())) - : Optional.<CheckpointDir>absent(); - final Accumulator<NamedAggregators> aggregatorsAccumulator = - SparkAggregators.getOrCreateNamedAggregators(jsc, maybeCheckpointDir); - // Instantiate metrics accumulator - MetricsAccumulator.init(jsc, maybeCheckpointDir); - final NamedAggregators initialValue = aggregatorsAccumulator.value(); - if (opts.getEnableSparkMetricSinks()) { - final MetricsSystem metricsSystem = SparkEnv$.MODULE$.get().metricsSystem(); - String appName = opts.getAppName(); - final AggregatorMetricSource aggregatorMetricSource = - new AggregatorMetricSource(appName, initialValue); - final SparkBeamMetricSource metricsSource = - new SparkBeamMetricSource(appName); - final CompositeSource compositeSource = - new CompositeSource(appName, - metricsSource.metricRegistry(), aggregatorMetricSource.metricRegistry()); - // re-register the metrics in case of context re-use - metricsSystem.removeSource(compositeSource); - metricsSystem.registerSource(compositeSource); - } - } - @Override public SparkPipelineResult run(final Pipeline pipeline) { LOG.info("Executing pipeline using the SparkRunner."); @@ -203,11 +174,16 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> { // register Watermarks listener to broadcast the advanced WMs. jssc.addStreamingListener(new JavaStreamingListenerWrapper(new WatermarksListener(jssc))); + // The reason we call initAccumulators here even though it is called in + // SparkRunnerStreamingContextFactory is because the factory is not called when resuming + // from checkpoint (When not resuming from checkpoint initAccumulators will be called twice + // but this is fine since it is idempotent). + initAccumulators(mOptions, jssc.sparkContext()); + startPipeline = executorService.submit(new Runnable() { @Override public void run() { - registerMetrics(mOptions, jssc.sparkContext()); LOG.info("Starting streaming pipeline execution."); jssc.start(); } @@ -218,11 +194,12 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> { final JavaSparkContext jsc = SparkContextFactory.getSparkContext(mOptions); final EvaluationContext evaluationContext = new EvaluationContext(jsc, pipeline); + initAccumulators(mOptions, jsc); + startPipeline = executorService.submit(new Runnable() { @Override public void run() { - registerMetrics(mOptions, jsc); pipeline.traverseTopologically(new Evaluator(new TransformTranslator.Translator(), evaluationContext)); evaluationContext.computeOutputs(); @@ -233,9 +210,35 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> { result = new SparkPipelineResult.BatchMode(startPipeline, jsc); } + if (mOptions.getEnableSparkMetricSinks()) { + registerMetricsSource(mOptions.getAppName()); + } + return result; } + private void registerMetricsSource(String appName) { + final MetricsSystem metricsSystem = SparkEnv$.MODULE$.get().metricsSystem(); + final AggregatorMetricSource aggregatorMetricSource = + new AggregatorMetricSource(null, AggregatorsAccumulator.getInstance().value()); + final SparkBeamMetricSource metricsSource = new SparkBeamMetricSource(null); + final CompositeSource compositeSource = + new CompositeSource(appName + ".Beam", metricsSource.metricRegistry(), + aggregatorMetricSource.metricRegistry()); + // re-register the metrics in case of context re-use + metricsSystem.removeSource(compositeSource); + metricsSystem.registerSource(compositeSource); + } + + /** + * Init Metrics/Aggregators accumulators. This method is idempotent. + */ + public static void initAccumulators(SparkPipelineOptions opts, JavaSparkContext jsc) { + // Init metrics accumulators + MetricsAccumulator.init(opts, jsc); + AggregatorsAccumulator.init(opts, jsc); + } + /** * Detect the translation mode for the pipeline and change options in case streaming * translation is needed. http://git-wip-us.apache.org/repos/asf/beam/blob/874c8d0d/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerDebugger.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerDebugger.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerDebugger.java index 395acff..7f7aefc 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerDebugger.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerDebugger.java @@ -20,12 +20,14 @@ package org.apache.beam.runners.spark; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; + import org.apache.beam.runners.spark.translation.EvaluationContext; import org.apache.beam.runners.spark.translation.SparkPipelineTranslator; import org.apache.beam.runners.spark.translation.TransformTranslator; import org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsValidator; import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.streaming.api.java.JavaStreamingContext; @@ -53,23 +55,34 @@ public final class SparkRunnerDebugger extends PipelineRunner<SparkPipelineResul private static final Logger LOG = LoggerFactory.getLogger(SparkRunnerDebugger.class); - private SparkRunnerDebugger() {} + private final SparkPipelineOptions options; + + private SparkRunnerDebugger(SparkPipelineOptions options) { + this.options = options; + } - @SuppressWarnings("unused") public static SparkRunnerDebugger fromOptions(PipelineOptions options) { - return new SparkRunnerDebugger(); + if (options instanceof TestSparkPipelineOptions) { + TestSparkPipelineOptions testSparkPipelineOptions = + PipelineOptionsValidator.validate(TestSparkPipelineOptions.class, options); + return new SparkRunnerDebugger(testSparkPipelineOptions); + } else { + SparkPipelineOptions sparkPipelineOptions = + PipelineOptionsValidator.validate(SparkPipelineOptions.class, options); + return new SparkRunnerDebugger(sparkPipelineOptions); + } } @Override public SparkPipelineResult run(Pipeline pipeline) { - SparkPipelineResult result; - - SparkPipelineOptions options = (SparkPipelineOptions) pipeline.getOptions(); - JavaSparkContext jsc = new JavaSparkContext("local[1]", "Debug_Pipeline"); JavaStreamingContext jssc = new JavaStreamingContext(jsc, new org.apache.spark.streaming.Duration(1000)); + + SparkRunner.initAccumulators(options, jsc); + TransformTranslator.Translator translator = new TransformTranslator.Translator(); + SparkNativePipelineVisitor visitor; if (options.isStreaming() || options instanceof TestSparkPipelineOptions @@ -82,8 +95,11 @@ public final class SparkRunnerDebugger extends PipelineRunner<SparkPipelineResul EvaluationContext ctxt = new EvaluationContext(jsc, pipeline, jssc); visitor = new SparkNativePipelineVisitor(translator, ctxt); } + pipeline.traverseTopologically(visitor); + jsc.stop(); + String debugString = visitor.getDebugString(); LOG.info("Translated Native Spark pipeline:\n" + debugString); return new DebugSparkPipelineResult(debugString); http://git-wip-us.apache.org/repos/asf/beam/blob/874c8d0d/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java index d321f99..e436422 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java @@ -34,7 +34,7 @@ import org.apache.beam.runners.core.UnboundedReadFromBoundedSource; import org.apache.beam.runners.core.construction.PTransformMatchers; import org.apache.beam.runners.core.construction.ReplacementOutputs; import org.apache.beam.runners.spark.aggregators.AggregatorsAccumulator; -import org.apache.beam.runners.spark.metrics.SparkMetricsContainer; +import org.apache.beam.runners.spark.metrics.MetricsAccumulator; import org.apache.beam.runners.spark.stateful.SparkTimerInternals; import org.apache.beam.runners.spark.util.GlobalWatermarkHolder; import org.apache.beam.sdk.Pipeline; @@ -115,7 +115,7 @@ public final class TestSparkRunner extends PipelineRunner<SparkPipelineResult> { // clear state of Aggregators, Metrics and Watermarks if exists. AggregatorsAccumulator.clear(); - SparkMetricsContainer.clear(); + MetricsAccumulator.clear(); GlobalWatermarkHolder.clear(); LOG.info("About to run test pipeline " + testSparkPipelineOptions.getJobName()); http://git-wip-us.apache.org/repos/asf/beam/blob/874c8d0d/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AggregatorsAccumulator.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AggregatorsAccumulator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AggregatorsAccumulator.java index 261c327..b8fc81b 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AggregatorsAccumulator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AggregatorsAccumulator.java @@ -21,6 +21,7 @@ package org.apache.beam.runners.spark.aggregators; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import java.io.IOException; +import org.apache.beam.runners.spark.SparkPipelineOptions; import org.apache.beam.runners.spark.translation.streaming.Checkpoint; import org.apache.beam.runners.spark.translation.streaming.Checkpoint.CheckpointDir; import org.apache.hadoop.fs.FileSystem; @@ -40,30 +41,48 @@ import org.slf4j.LoggerFactory; public class AggregatorsAccumulator { private static final Logger LOG = LoggerFactory.getLogger(AggregatorsAccumulator.class); + private static final String ACCUMULATOR_NAME = "Beam.Aggregators"; private static final String ACCUMULATOR_CHECKPOINT_FILENAME = "aggregators"; - private static volatile Accumulator<NamedAggregators> instance; + private static volatile Accumulator<NamedAggregators> instance = null; private static volatile FileSystem fileSystem; private static volatile Path checkpointFilePath; - @SuppressWarnings("OptionalUsedAsFieldOrParameterType") - static Accumulator<NamedAggregators> getInstance( - JavaSparkContext jsc, - Optional<CheckpointDir> checkpointDir) { + /** + * Init aggregators accumulator if it has not been initiated. This method is idempotent. + */ + public static void init(SparkPipelineOptions opts, JavaSparkContext jsc) { if (instance == null) { synchronized (AggregatorsAccumulator.class) { if (instance == null) { - instance = jsc.sc().accumulator(new NamedAggregators(), new AggAccumParam()); - if (checkpointDir.isPresent()) { - recoverValueFromCheckpoint(jsc, checkpointDir.get()); + Optional<CheckpointDir> maybeCheckpointDir = + opts.isStreaming() ? Optional.of(new CheckpointDir(opts.getCheckpointDir())) + : Optional.<CheckpointDir>absent(); + Accumulator<NamedAggregators> accumulator = + jsc.sc().accumulator(new NamedAggregators(), ACCUMULATOR_NAME, new AggAccumParam()); + if (maybeCheckpointDir.isPresent()) { + Optional<NamedAggregators> maybeRecoveredValue = + recoverValueFromCheckpoint(jsc, maybeCheckpointDir.get()); + if (maybeRecoveredValue.isPresent()) { + accumulator.setValue(maybeRecoveredValue.get()); + } } + instance = accumulator; } } + LOG.info("Instantiated aggregators accumulator: " + instance.value()); + } + } + + public static Accumulator<NamedAggregators> getInstance() { + if (instance == null) { + throw new IllegalStateException("Aggregrators accumulator has not been instantiated"); + } else { + return instance; } - return instance; } - private static void recoverValueFromCheckpoint( + private static Optional<NamedAggregators> recoverValueFromCheckpoint( JavaSparkContext jsc, CheckpointDir checkpointDir) { try { @@ -72,14 +91,15 @@ public class AggregatorsAccumulator { fileSystem = checkpointFilePath.getFileSystem(jsc.hadoopConfiguration()); NamedAggregators recoveredValue = Checkpoint.readObject(fileSystem, checkpointFilePath); if (recoveredValue != null) { - LOG.info("Recovered accumulators from checkpoint: " + recoveredValue); - instance.setValue(recoveredValue); + LOG.info("Recovered aggregators from checkpoint"); + return Optional.of(recoveredValue); } else { LOG.info("No accumulator checkpoint found."); } } catch (Exception e) { throw new RuntimeException("Failure while reading accumulator checkpoint.", e); } + return Optional.absent(); } private static void checkpoint() throws IOException { http://git-wip-us.apache.org/repos/asf/beam/blob/874c8d0d/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/SparkAggregators.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/SparkAggregators.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/SparkAggregators.java index 131b761..1da196b 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/SparkAggregators.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/SparkAggregators.java @@ -18,19 +18,16 @@ package org.apache.beam.runners.spark.aggregators; -import com.google.common.base.Optional; import com.google.common.collect.ImmutableList; import java.util.Collection; import java.util.Map; import org.apache.beam.runners.core.AggregatorFactory; import org.apache.beam.runners.core.ExecutionContext; import org.apache.beam.runners.spark.translation.SparkRuntimeContext; -import org.apache.beam.runners.spark.translation.streaming.Checkpoint.CheckpointDir; import org.apache.beam.sdk.AggregatorValues; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Combine; import org.apache.spark.Accumulator; -import org.apache.spark.api.java.JavaSparkContext; /** * A utility class for handling Beam {@link Aggregator}s. @@ -64,41 +61,14 @@ public class SparkAggregators { } /** - * Retrieves the {@link NamedAggregators} instance using the provided Spark context. - * - * @param jsc a Spark context to be used in order to retrieve the name - * {@link NamedAggregators} instance - */ - public static Accumulator<NamedAggregators> getNamedAggregators(JavaSparkContext jsc) { - return getOrCreateNamedAggregators(jsc, Optional.<CheckpointDir>absent()); - } - - /** - * Retrieves or creates the {@link NamedAggregators} instance using the provided Spark context. - * - * @param jsc a Spark context to be used in order to retrieve the name - * {@link NamedAggregators} instance - * @param checkpointDir checkpoint dir (optional, for streaming pipelines) - * @return a {@link NamedAggregators} instance - */ - @SuppressWarnings("OptionalUsedAsFieldOrParameterType") - public static Accumulator<NamedAggregators> getOrCreateNamedAggregators( - JavaSparkContext jsc, - Optional<CheckpointDir> checkpointDir) { - return AggregatorsAccumulator.getInstance(jsc, checkpointDir); - } - - /** * Retrieves the value of an aggregator from a SparkContext instance. * * @param aggregator The aggregator whose value to retrieve - * @param javaSparkContext The SparkContext instance * @param <T> The type of the aggregator's output * @return The value of the aggregator */ - public static <T> AggregatorValues<T> valueOf(final Aggregator<?, T> aggregator, - final JavaSparkContext javaSparkContext) { - return valueOf(getNamedAggregators(javaSparkContext), aggregator); + public static <T> AggregatorValues<T> valueOf(final Aggregator<?, T> aggregator) { + return valueOf(AggregatorsAccumulator.getInstance(), aggregator); } /** @@ -109,10 +79,8 @@ public class SparkAggregators { * @param <T> Type of object to be returned. * @return The value of the aggregator. */ - public static <T> T valueOf(final String name, - final Class<T> typeClass, - final JavaSparkContext javaSparkContext) { - return valueOf(getNamedAggregators(javaSparkContext), name, typeClass); + public static <T> T valueOf(final String name, final Class<T> typeClass) { + return valueOf(AggregatorsAccumulator.getInstance(), name, typeClass); } /** http://git-wip-us.apache.org/repos/asf/beam/blob/874c8d0d/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/AggregatorMetricSource.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/AggregatorMetricSource.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/AggregatorMetricSource.java index b3880e8..919e6f2 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/AggregatorMetricSource.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/AggregatorMetricSource.java @@ -28,19 +28,20 @@ import org.apache.spark.metrics.source.Source; * wrapping an underlying {@link NamedAggregators} instance. */ public class AggregatorMetricSource implements Source { + private static final String METRIC_NAME = "Aggregators"; - private final String sourceName; + private final String name; private final MetricRegistry metricRegistry = new MetricRegistry(); - public AggregatorMetricSource(final String appName, final NamedAggregators aggregators) { - sourceName = appName; - metricRegistry.register("Beam.Aggregators", AggregatorMetric.of(aggregators)); + public AggregatorMetricSource(final String name, final NamedAggregators aggregators) { + this.name = name; + metricRegistry.register(METRIC_NAME, AggregatorMetric.of(aggregators)); } @Override public String sourceName() { - return sourceName; + return name; } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/874c8d0d/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsAccumulator.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsAccumulator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsAccumulator.java index 9d48289..1153db6 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsAccumulator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsAccumulator.java @@ -21,6 +21,7 @@ package org.apache.beam.runners.spark.metrics; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import java.io.IOException; +import org.apache.beam.runners.spark.SparkPipelineOptions; import org.apache.beam.runners.spark.translation.streaming.Checkpoint; import org.apache.beam.runners.spark.translation.streaming.Checkpoint.CheckpointDir; import org.apache.hadoop.fs.FileSystem; @@ -40,27 +41,37 @@ import org.slf4j.LoggerFactory; public class MetricsAccumulator { private static final Logger LOG = LoggerFactory.getLogger(MetricsAccumulator.class); + private static final String ACCUMULATOR_NAME = "Beam.Metrics"; private static final String ACCUMULATOR_CHECKPOINT_FILENAME = "metrics"; private static volatile Accumulator<SparkMetricsContainer> instance = null; private static volatile FileSystem fileSystem; private static volatile Path checkpointFilePath; - @SuppressWarnings("OptionalUsedAsFieldOrParameterType") - public static void init( - JavaSparkContext jsc, - Optional<CheckpointDir> checkpointDir) { + /** + * Init metrics accumulator if it has not been initiated. This method is idempotent. + */ + public static void init(SparkPipelineOptions opts, JavaSparkContext jsc) { if (instance == null) { synchronized (MetricsAccumulator.class) { if (instance == null) { - SparkMetricsContainer initialValue = new SparkMetricsContainer(); - instance = jsc.sc().accumulator(initialValue, "Beam.Metrics", - new MetricsAccumulatorParam()); - if (checkpointDir.isPresent()) { - recoverValueFromCheckpoint(jsc, checkpointDir.get()); + Optional<CheckpointDir> maybeCheckpointDir = + opts.isStreaming() ? Optional.of(new CheckpointDir(opts.getCheckpointDir())) + : Optional.<CheckpointDir>absent(); + Accumulator<SparkMetricsContainer> accumulator = + jsc.sc().accumulator(new SparkMetricsContainer(), ACCUMULATOR_NAME, + new MetricsAccumulatorParam()); + if (maybeCheckpointDir.isPresent()) { + Optional<SparkMetricsContainer> maybeRecoveredValue = + recoverValueFromCheckpoint(jsc, maybeCheckpointDir.get()); + if (maybeRecoveredValue.isPresent()) { + accumulator.setValue(maybeRecoveredValue.get()); + } } + instance = accumulator; } } + LOG.info("Instantiated metrics accumulator: " + instance.value()); } } @@ -72,7 +83,7 @@ public class MetricsAccumulator { } } - private static void recoverValueFromCheckpoint( + private static Optional<SparkMetricsContainer> recoverValueFromCheckpoint( JavaSparkContext jsc, CheckpointDir checkpointDir) { try { @@ -81,18 +92,19 @@ public class MetricsAccumulator { fileSystem = checkpointFilePath.getFileSystem(jsc.hadoopConfiguration()); SparkMetricsContainer recoveredValue = Checkpoint.readObject(fileSystem, checkpointFilePath); if (recoveredValue != null) { - LOG.info("Recovered metrics from checkpoint: " + recoveredValue); - instance.setValue(recoveredValue); + LOG.info("Recovered metrics from checkpoint."); + return Optional.of(recoveredValue); } else { LOG.info("No metrics checkpoint found."); } } catch (Exception e) { throw new RuntimeException("Failure while reading metrics checkpoint.", e); } + return Optional.absent(); } @VisibleForTesting - static void clear() { + public static void clear() { synchronized (MetricsAccumulator.class) { instance = null; } http://git-wip-us.apache.org/repos/asf/beam/blob/874c8d0d/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkBeamMetricSource.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkBeamMetricSource.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkBeamMetricSource.java index 24231c3..9cab66d 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkBeamMetricSource.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkBeamMetricSource.java @@ -28,19 +28,20 @@ import org.apache.spark.metrics.source.Source; * wrapping an underlying {@link SparkMetricsContainer} instance. */ public class SparkBeamMetricSource implements Source { + private static final String METRIC_NAME = "Metrics"; - private final String sourceName; + private final String name; private final MetricRegistry metricRegistry = new MetricRegistry(); - public SparkBeamMetricSource(final String appName) { - sourceName = appName; - metricRegistry.register("Beam.Metrics", new SparkBeamMetric()); + public SparkBeamMetricSource(final String name) { + this.name = name; + metricRegistry.register(METRIC_NAME, new SparkBeamMetric()); } @Override public String sourceName() { - return sourceName; + return name; } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/874c8d0d/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricsContainer.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricsContainer.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricsContainer.java index 7a4b222..d376ce3 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricsContainer.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricsContainer.java @@ -18,7 +18,6 @@ package org.apache.beam.runners.spark.metrics; -import com.google.common.annotations.VisibleForTesting; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; @@ -66,11 +65,15 @@ public class SparkMetricsContainer implements Serializable { } static Collection<MetricUpdate<Long>> getCounters() { - return getInstance().counters.values(); + SparkMetricsContainer sparkMetricsContainer = getInstance(); + sparkMetricsContainer.materialize(); + return sparkMetricsContainer.counters.values(); } static Collection<MetricUpdate<DistributionData>> getDistributions() { - return getInstance().distributions.values(); + SparkMetricsContainer sparkMetricsContainer = getInstance(); + sparkMetricsContainer.materialize(); + return sparkMetricsContainer.distributions.values(); } SparkMetricsContainer update(SparkMetricsContainer other) { @@ -141,12 +144,4 @@ public class SparkMetricsContainer implements Serializable { } return sb.toString(); } - - @VisibleForTesting - public static void clear() { - try { - MetricsAccumulator.clear(); - } catch (IllegalStateException ignored) { - } - } } http://git-wip-us.apache.org/repos/asf/beam/blob/874c8d0d/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java index 44b4039..8d1b82e 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java @@ -37,8 +37,8 @@ import org.apache.avro.mapred.AvroKey; import org.apache.avro.mapreduce.AvroJob; import org.apache.avro.mapreduce.AvroKeyInputFormat; import org.apache.beam.runners.core.SystemReduceFn; +import org.apache.beam.runners.spark.aggregators.AggregatorsAccumulator; import org.apache.beam.runners.spark.aggregators.NamedAggregators; -import org.apache.beam.runners.spark.aggregators.SparkAggregators; import org.apache.beam.runners.spark.coders.CoderHelpers; import org.apache.beam.runners.spark.io.SourceRDD; import org.apache.beam.runners.spark.io.hadoop.HadoopIO; @@ -138,8 +138,7 @@ public final class TransformTranslator { ((BoundedDataset<KV<K, V>>) context.borrowDataset(transform)).getRDD(); @SuppressWarnings("unchecked") final KvCoder<K, V> coder = (KvCoder<K, V>) context.getInput(transform).getCoder(); - final Accumulator<NamedAggregators> accum = - SparkAggregators.getNamedAggregators(context.getSparkContext()); + final Accumulator<NamedAggregators> accum = AggregatorsAccumulator.getInstance(); @SuppressWarnings("unchecked") final WindowingStrategy<?, W> windowingStrategy = (WindowingStrategy<?, W>) context.getInput(transform).getWindowingStrategy(); @@ -362,9 +361,7 @@ public final class TransformTranslator { ((BoundedDataset<InputT>) context.borrowDataset(transform)).getRDD(); WindowingStrategy<?, ?> windowingStrategy = context.getInput(transform).getWindowingStrategy(); - JavaSparkContext jsc = context.getSparkContext(); - Accumulator<NamedAggregators> aggAccum = - SparkAggregators.getNamedAggregators(jsc); + Accumulator<NamedAggregators> aggAccum = AggregatorsAccumulator.getInstance(); Accumulator<SparkMetricsContainer> metricsAccum = MetricsAccumulator.getInstance(); Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs = @@ -395,9 +392,7 @@ public final class TransformTranslator { ((BoundedDataset<InputT>) context.borrowDataset(transform)).getRDD(); WindowingStrategy<?, ?> windowingStrategy = context.getInput(transform).getWindowingStrategy(); - JavaSparkContext jsc = context.getSparkContext(); - Accumulator<NamedAggregators> aggAccum = - SparkAggregators.getNamedAggregators(jsc); + Accumulator<NamedAggregators> aggAccum = AggregatorsAccumulator.getInstance(); Accumulator<SparkMetricsContainer> metricsAccum = MetricsAccumulator.getInstance(); JavaPairRDD<TupleTag<?>, WindowedValue<?>> all = inRDD http://git-wip-us.apache.org/repos/asf/beam/blob/874c8d0d/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java index ffa8e69..7048be6 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java @@ -79,6 +79,9 @@ public class SparkRunnerStreamingContextFactory implements JavaStreamingContextF JavaSparkContext jsc = SparkContextFactory.getSparkContext(options); JavaStreamingContext jssc = new JavaStreamingContext(jsc, batchDuration); + // We must first init accumulators since translators expect them to be instantiated. + SparkRunner.initAccumulators(options, jsc); + ctxt = new EvaluationContext(jsc, pipeline, jssc); pipeline.traverseTopologically(new SparkRunner.Evaluator(translator, ctxt)); ctxt.computeOutputs(); http://git-wip-us.apache.org/repos/asf/beam/blob/874c8d0d/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java index 8a05fbb..2744169 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java @@ -32,8 +32,8 @@ import java.util.Map; import java.util.Queue; import java.util.concurrent.LinkedBlockingQueue; import javax.annotation.Nonnull; +import org.apache.beam.runners.spark.aggregators.AggregatorsAccumulator; import org.apache.beam.runners.spark.aggregators.NamedAggregators; -import org.apache.beam.runners.spark.aggregators.SparkAggregators; import org.apache.beam.runners.spark.coders.CoderHelpers; import org.apache.beam.runners.spark.io.ConsoleIO; import org.apache.beam.runners.spark.io.CreateStream; @@ -92,7 +92,6 @@ import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; - /** * Supports translation between a Beam transform, and Spark's operations on DStreams. */ @@ -394,8 +393,7 @@ public final class StreamingTransformTranslator { public JavaRDD<WindowedValue<OutputT>> call(JavaRDD<WindowedValue<InputT>> rdd) throws Exception { final JavaSparkContext jsc = new JavaSparkContext(rdd.context()); - final Accumulator<NamedAggregators> aggAccum = - SparkAggregators.getNamedAggregators(jsc); + final Accumulator<NamedAggregators> aggAccum = AggregatorsAccumulator.getInstance(); final Accumulator<SparkMetricsContainer> metricsAccum = MetricsAccumulator.getInstance(); final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs = @@ -444,9 +442,7 @@ public final class StreamingTransformTranslator { public JavaPairRDD<TupleTag<?>, WindowedValue<?>> call( JavaRDD<WindowedValue<InputT>> rdd) throws Exception { String stepName = context.getCurrentTransform().getFullName(); - JavaSparkContext jsc = new JavaSparkContext(rdd.context()); - final Accumulator<NamedAggregators> aggAccum = - SparkAggregators.getNamedAggregators(jsc); + final Accumulator<NamedAggregators> aggAccum = AggregatorsAccumulator.getInstance(); final Accumulator<SparkMetricsContainer> metricsAccum = MetricsAccumulator.getInstance(); final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs = http://git-wip-us.apache.org/repos/asf/beam/blob/874c8d0d/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java index a192807..dbd8cac 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java @@ -27,14 +27,11 @@ import java.util.Arrays; import java.util.List; import java.util.Set; import org.apache.beam.runners.spark.PipelineRule; -import org.apache.beam.runners.spark.SparkPipelineOptions; import org.apache.beam.runners.spark.aggregators.ClearAggregatorsRule; import org.apache.beam.runners.spark.aggregators.SparkAggregators; import org.apache.beam.runners.spark.examples.WordCount; -import org.apache.beam.runners.spark.translation.SparkContextFactory; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.MapElements; @@ -86,24 +83,18 @@ public class NamedAggregatorsTest { @Test public void testNamedAggregators() throws Exception { - - // don't reuse context in this test, as is tends to mess up Spark's MetricsSystem thread-safety - System.setProperty("beam.spark.test.reuseSparkContext", "false"); - assertThat(InMemoryMetrics.valueOf("emptyLines"), is(nullValue())); runPipeline(); assertThat(InMemoryMetrics.<Double>valueOf("emptyLines"), is(1d)); - } @Test public void testNonExistingAggregatorName() throws Exception { - final SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class); - final Long valueOf = - SparkAggregators.valueOf( - "myMissingAggregator", Long.class, SparkContextFactory.getSparkContext(options)); + runPipeline(); + + final Long valueOf = SparkAggregators.valueOf("myMissingAggregator", Long.class); assertThat(valueOf, is(nullValue())); } http://git-wip-us.apache.org/repos/asf/beam/blob/874c8d0d/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java index bc22980..ce502d6 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java @@ -40,7 +40,7 @@ import org.apache.beam.runners.spark.SparkPipelineResult; import org.apache.beam.runners.spark.TestSparkPipelineOptions; import org.apache.beam.runners.spark.aggregators.AggregatorsAccumulator; import org.apache.beam.runners.spark.coders.CoderHelpers; -import org.apache.beam.runners.spark.metrics.SparkMetricsContainer; +import org.apache.beam.runners.spark.metrics.MetricsAccumulator; import org.apache.beam.runners.spark.translation.streaming.utils.EmbeddedKafkaCluster; import org.apache.beam.runners.spark.util.GlobalWatermarkHolder; import org.apache.beam.sdk.Pipeline; @@ -171,7 +171,7 @@ public class ResumeFromCheckpointStreamingTest { //- clear state. AggregatorsAccumulator.clear(); - SparkMetricsContainer.clear(); + MetricsAccumulator.clear(); GlobalWatermarkHolder.clear(); //- write a bit more.