Repository: beam Updated Branches: refs/heads/master 343176c00 -> 847e4e9f0
[BEAM-648] Persist and restore Aggergator values in case of recovery from failure Added javadoc and minor refactor Moved creation of beam checkpoint dir Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/62f9e7b1 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/62f9e7b1 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/62f9e7b1 Branch: refs/heads/master Commit: 62f9e7b1e1a8a8f2317e3508ccce615f2b30d4f6 Parents: 343176c Author: Aviem Zur <aviem...@gmail.com> Authored: Sun Jan 22 14:30:44 2017 +0200 Committer: Sela <ans...@paypal.com> Committed: Mon Jan 30 22:53:34 2017 +0200 ---------------------------------------------------------------------- .../apache/beam/runners/spark/SparkRunner.java | 21 ++++- .../spark/aggregators/AccumulatorSingleton.java | 96 ++++++++++++++++++-- .../spark/aggregators/SparkAggregators.java | 20 +++- .../translation/streaming/CheckpointDir.java | 69 ++++++++++++++ .../SparkRunnerStreamingContextFactory.java | 44 ++++++--- .../ResumeFromCheckpointStreamingTest.java | 5 +- 6 files changed, 230 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/62f9e7b1/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java index 92c07bb..578ed21 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java @@ -18,12 +18,14 @@ package org.apache.beam.runners.spark; +import com.google.common.base.Optional; import com.google.common.collect.Iterables; import java.util.Collection; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import org.apache.beam.runners.spark.aggregators.AccumulatorSingleton; import org.apache.beam.runners.spark.aggregators.NamedAggregators; import org.apache.beam.runners.spark.aggregators.SparkAggregators; import org.apache.beam.runners.spark.aggregators.metrics.AggregatorMetricSource; @@ -32,6 +34,7 @@ import org.apache.beam.runners.spark.translation.SparkContextFactory; import org.apache.beam.runners.spark.translation.SparkPipelineTranslator; import org.apache.beam.runners.spark.translation.TransformEvaluator; import org.apache.beam.runners.spark.translation.TransformTranslator; +import org.apache.beam.runners.spark.translation.streaming.CheckpointDir; import org.apache.beam.runners.spark.translation.streaming.SparkRunnerStreamingContextFactory; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.Read; @@ -54,6 +57,7 @@ import org.apache.spark.SparkEnv$; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.metrics.MetricsSystem; import org.apache.spark.streaming.api.java.JavaStreamingContext; +import org.apache.spark.streaming.api.java.JavaStreamingListenerWrapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -130,7 +134,11 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> { } private void registerMetrics(final SparkPipelineOptions opts, final JavaSparkContext jsc) { - final Accumulator<NamedAggregators> accum = SparkAggregators.getNamedAggregators(jsc); + Optional<CheckpointDir> maybeCheckpointDir = + opts.isStreaming() ? Optional.of(new CheckpointDir(opts.getCheckpointDir())) + : Optional.<CheckpointDir>absent(); + final Accumulator<NamedAggregators> accum = + SparkAggregators.getOrCreateNamedAggregators(jsc, maybeCheckpointDir); final NamedAggregators initialValue = accum.value(); if (opts.getEnableSparkMetricSinks()) { @@ -154,10 +162,17 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> { detectTranslationMode(pipeline); if (mOptions.isStreaming()) { + CheckpointDir checkpointDir = new CheckpointDir(mOptions.getCheckpointDir()); final SparkRunnerStreamingContextFactory contextFactory = - new SparkRunnerStreamingContextFactory(pipeline, mOptions); + new SparkRunnerStreamingContextFactory(pipeline, mOptions, checkpointDir); final JavaStreamingContext jssc = - JavaStreamingContext.getOrCreate(mOptions.getCheckpointDir(), contextFactory); + JavaStreamingContext.getOrCreate(checkpointDir.getSparkCheckpointDir().toString(), + contextFactory); + + // Checkpoint aggregator values + jssc.addStreamingListener( + new JavaStreamingListenerWrapper( + new AccumulatorSingleton.AccumulatorCheckpointingSparkListener())); startPipeline = executorService.submit(new Runnable() { http://git-wip-us.apache.org/repos/asf/beam/blob/62f9e7b1/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AccumulatorSingleton.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AccumulatorSingleton.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AccumulatorSingleton.java index 883830e..473750c 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AccumulatorSingleton.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AccumulatorSingleton.java @@ -19,35 +19,119 @@ package org.apache.beam.runners.spark.aggregators; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import org.apache.beam.runners.spark.translation.streaming.CheckpointDir; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.spark.Accumulator; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.streaming.api.java.JavaStreamingListener; +import org.apache.spark.streaming.api.java.JavaStreamingListenerBatchCompleted; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * For resilience, {@link Accumulator}s are required to be wrapped in a Singleton. * @see <a href="https://spark.apache.org/docs/1.6.3/streaming-programming-guide.html#accumulators-and-broadcast-variables">accumulators</a> */ -class AccumulatorSingleton { +public class AccumulatorSingleton { + private static final Logger LOG = LoggerFactory.getLogger(AccumulatorSingleton.class); + + private static final String ACCUMULATOR_CHECKPOINT_FILENAME = "beam_aggregators"; - private static volatile Accumulator<NamedAggregators> instance = null; + private static volatile Accumulator<NamedAggregators> instance; + private static volatile FileSystem fileSystem; + private static volatile Path checkpointPath; + private static volatile Path tempCheckpointPath; + private static volatile Path backupCheckpointPath; - static Accumulator<NamedAggregators> getInstance(JavaSparkContext jsc) { + @SuppressWarnings("OptionalUsedAsFieldOrParameterType") + static Accumulator<NamedAggregators> getInstance( + JavaSparkContext jsc, + Optional<CheckpointDir> checkpointDir) { if (instance == null) { synchronized (AccumulatorSingleton.class) { if (instance == null) { - //TODO: currently when recovering from checkpoint, Spark does not recover the - // last known Accumulator value. The SparkRunner should be able to persist and recover - // the NamedAggregators in order to recover Aggregators as well. instance = jsc.sc().accumulator(new NamedAggregators(), new AggAccumParam()); + if (checkpointDir.isPresent()) { + recoverValueFromCheckpoint(jsc, checkpointDir.get()); + } } } } return instance; } + private static void recoverValueFromCheckpoint( + JavaSparkContext jsc, + CheckpointDir checkpointDir) { + FSDataInputStream is = null; + try { + Path beamCheckpointPath = checkpointDir.getBeamCheckpointDir(); + checkpointPath = new Path(beamCheckpointPath, ACCUMULATOR_CHECKPOINT_FILENAME); + tempCheckpointPath = checkpointPath.suffix(".tmp"); + backupCheckpointPath = checkpointPath.suffix(".bak"); + fileSystem = checkpointPath.getFileSystem(jsc.hadoopConfiguration()); + if (fileSystem.exists(checkpointPath)) { + is = fileSystem.open(checkpointPath); + } else if (fileSystem.exists(backupCheckpointPath)) { + is = fileSystem.open(backupCheckpointPath); + } + if (is != null) { + ObjectInputStream objectInputStream = new ObjectInputStream(is); + NamedAggregators recoveredValue = + (NamedAggregators) objectInputStream.readObject(); + objectInputStream.close(); + LOG.info("Recovered accumulators from checkpoint: " + recoveredValue); + instance.setValue(recoveredValue); + } else { + LOG.info("No accumulator checkpoint found."); + } + } catch (Exception e) { + throw new RuntimeException("Failure while reading accumulator checkpoint.", e); + } + } + + private static void checkpoint() throws IOException { + if (checkpointPath != null) { + if (fileSystem.exists(checkpointPath)) { + if (fileSystem.exists(backupCheckpointPath)) { + fileSystem.delete(backupCheckpointPath, false); + } + fileSystem.rename(checkpointPath, backupCheckpointPath); + } + FSDataOutputStream os = fileSystem.create(tempCheckpointPath, true); + ObjectOutputStream oos = new ObjectOutputStream(os); + oos.writeObject(instance.value()); + oos.close(); + fileSystem.rename(tempCheckpointPath, checkpointPath); + } + } + @VisibleForTesting static void clear() { synchronized (AccumulatorSingleton.class) { instance = null; } } + + /** + * Spark Listener which checkpoints {@link NamedAggregators} values for fault-tolerance. + */ + public static class AccumulatorCheckpointingSparkListener extends JavaStreamingListener { + @Override + public void onBatchCompleted(JavaStreamingListenerBatchCompleted batchCompleted) { + try { + checkpoint(); + } catch (IOException e) { + LOG.error("Failed to checkpoint accumulator singleton.", e); + } + } + } } http://git-wip-us.apache.org/repos/asf/beam/blob/62f9e7b1/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/SparkAggregators.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/SparkAggregators.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/SparkAggregators.java index fa5c8d1..245c69e 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/SparkAggregators.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/SparkAggregators.java @@ -18,12 +18,14 @@ package org.apache.beam.runners.spark.aggregators; +import com.google.common.base.Optional; import com.google.common.collect.ImmutableList; import java.util.Collection; import java.util.Map; import org.apache.beam.runners.core.AggregatorFactory; import org.apache.beam.runners.core.ExecutionContext; import org.apache.beam.runners.spark.translation.SparkRuntimeContext; +import org.apache.beam.runners.spark.translation.streaming.CheckpointDir; import org.apache.beam.sdk.AggregatorValues; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Combine; @@ -66,10 +68,24 @@ public class SparkAggregators { * * @param jsc a Spark context to be used in order to retrieve the name * {@link NamedAggregators} instance - * @return a {@link NamedAggregators} instance */ public static Accumulator<NamedAggregators> getNamedAggregators(JavaSparkContext jsc) { - return AccumulatorSingleton.getInstance(jsc); + return getOrCreateNamedAggregators(jsc, Optional.<CheckpointDir>absent()); + } + + /** + * Retrieves or creates the {@link NamedAggregators} instance using the provided Spark context. + * + * @param jsc a Spark context to be used in order to retrieve the name + * {@link NamedAggregators} instance + * @param checkpointDir checkpoint dir (optional, for streaming pipelines) + * @return a {@link NamedAggregators} instance + */ + @SuppressWarnings("OptionalUsedAsFieldOrParameterType") + public static Accumulator<NamedAggregators> getOrCreateNamedAggregators( + JavaSparkContext jsc, + Optional<CheckpointDir> checkpointDir) { + return AccumulatorSingleton.getInstance(jsc, checkpointDir); } /** http://git-wip-us.apache.org/repos/asf/beam/blob/62f9e7b1/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/CheckpointDir.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/CheckpointDir.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/CheckpointDir.java new file mode 100644 index 0000000..5b192bd --- /dev/null +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/CheckpointDir.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.spark.translation.streaming; + +import org.apache.beam.runners.spark.SparkPipelineOptions; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Spark checkpoint dir tree. + * + * {@link SparkPipelineOptions} checkpointDir is used as a root directory under which one directory + * is created for Spark's checkpoint and another for Beam's Spark runner's fault tolerance needs. + * Spark's checkpoint relies on Hadoop's {@link org.apache.hadoop.fs.FileSystem} and is used for + * Beam as well rather than {@link org.apache.beam.sdk.io.FileSystem} to be consistent with Spark. + */ +public class CheckpointDir { + private static final Logger LOG = LoggerFactory.getLogger(CheckpointDir.class); + + private static final String SPARK_CHECKPOINT_DIR = "spark-checkpoint"; + private static final String BEAM_CHECKPOINT_DIR = "beam-checkpoint"; + private static final String KNOWN_RELIABLE_FS_PATTERN = "^(hdfs|s3|gs)"; + + private final Path rootCheckpointDir; + private final Path sparkCheckpointDir; + private final Path beamCheckpointDir; + + public CheckpointDir(String rootCheckpointDir) { + if (!rootCheckpointDir.matches(KNOWN_RELIABLE_FS_PATTERN)) { + LOG.warn("The specified checkpoint dir {} does not match a reliable filesystem so in case " + + "of failures this job may not recover properly or even at all.", rootCheckpointDir); + } + LOG.info("Checkpoint dir set to: {}", rootCheckpointDir); + + this.rootCheckpointDir = new Path(rootCheckpointDir); + this.sparkCheckpointDir = new Path(rootCheckpointDir, SPARK_CHECKPOINT_DIR); + this.beamCheckpointDir = new Path(rootCheckpointDir, BEAM_CHECKPOINT_DIR); + } + + public Path getRootCheckpointDir() { + return rootCheckpointDir; + } + + public Path getSparkCheckpointDir() { + return sparkCheckpointDir; + } + + public Path getBeamCheckpointDir() { + return beamCheckpointDir; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/62f9e7b1/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java index d069a11..6d254e1 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java @@ -20,6 +20,7 @@ package org.apache.beam.runners.spark.translation.streaming; import static com.google.common.base.Preconditions.checkArgument; +import java.io.IOException; import org.apache.beam.runners.spark.SparkContextOptions; import org.apache.beam.runners.spark.SparkPipelineOptions; import org.apache.beam.runners.spark.SparkRunner; @@ -28,6 +29,8 @@ import org.apache.beam.runners.spark.translation.SparkContextFactory; import org.apache.beam.runners.spark.translation.SparkPipelineTranslator; import org.apache.beam.runners.spark.translation.TransformTranslator; import org.apache.beam.sdk.Pipeline; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.api.java.JavaStreamingContext; @@ -45,14 +48,18 @@ import org.slf4j.LoggerFactory; public class SparkRunnerStreamingContextFactory implements JavaStreamingContextFactory { private static final Logger LOG = LoggerFactory.getLogger(SparkRunnerStreamingContextFactory.class); - private static final String KNOWN_RELIABLE_FS_PATTERN = "^(hdfs|s3|gs)"; private final Pipeline pipeline; private final SparkPipelineOptions options; + private final CheckpointDir checkpointDir; - public SparkRunnerStreamingContextFactory(Pipeline pipeline, SparkPipelineOptions options) { + public SparkRunnerStreamingContextFactory( + Pipeline pipeline, + SparkPipelineOptions options, + CheckpointDir checkpointDir) { this.pipeline = pipeline; this.options = options; + this.checkpointDir = checkpointDir; } private EvaluationContext ctxt; @@ -73,18 +80,12 @@ public class SparkRunnerStreamingContextFactory implements JavaStreamingContextF JavaSparkContext jsc = SparkContextFactory.getSparkContext(options); JavaStreamingContext jssc = new JavaStreamingContext(jsc, batchDuration); + ctxt = new EvaluationContext(jsc, pipeline, jssc); pipeline.traverseTopologically(new SparkRunner.Evaluator(translator, ctxt)); ctxt.computeOutputs(); - // set checkpoint dir. - String checkpointDir = options.getCheckpointDir(); - if (!checkpointDir.matches(KNOWN_RELIABLE_FS_PATTERN)) { - LOG.warn("The specified checkpoint dir {} does not match a reliable filesystem so in case " - + "of failures this job may not recover properly or even at all.", checkpointDir); - } - LOG.info("Checkpoint dir set to: {}", checkpointDir); - jssc.checkpoint(checkpointDir); + checkpoint(jssc); // register listeners. for (JavaStreamingListener listener: options.as(SparkContextOptions.class).getListeners()) { @@ -95,7 +96,26 @@ public class SparkRunnerStreamingContextFactory implements JavaStreamingContextF return jssc; } - public EvaluationContext getCtxt() { - return ctxt; + private void checkpoint(JavaStreamingContext jssc) { + Path rootCheckpointPath = checkpointDir.getRootCheckpointDir(); + Path sparkCheckpointPath = checkpointDir.getSparkCheckpointDir(); + Path beamCheckpointPath = checkpointDir.getBeamCheckpointDir(); + + try { + FileSystem fileSystem = rootCheckpointPath.getFileSystem(jssc.sc().hadoopConfiguration()); + if (!fileSystem.exists(rootCheckpointPath)) { + fileSystem.mkdirs(rootCheckpointPath); + } + if (!fileSystem.exists(sparkCheckpointPath)) { + fileSystem.mkdirs(sparkCheckpointPath); + } + if (!fileSystem.exists(beamCheckpointPath)) { + fileSystem.mkdirs(beamCheckpointPath); + } + } catch (IOException e) { + throw new RuntimeException("Failed to create checkpoint dir", e); + } + + jssc.checkpoint(sparkCheckpointPath.toString()); } } http://git-wip-us.apache.org/repos/asf/beam/blob/62f9e7b1/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java index 7346bd9..8280672 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java @@ -81,6 +81,7 @@ public class ResumeFromCheckpointStreamingTest { ); private static final String[] EXPECTED = {"k1,v1", "k2,v2", "k3,v3", "k4,v4"}; private static final long EXPECTED_AGG_FIRST = 4L; + private static final long EXPECTED_AGG_SECOND = 8L; @Rule public TemporaryFolder checkpointParentDir = new TemporaryFolder(); @@ -141,8 +142,8 @@ public class ResumeFromCheckpointStreamingTest { res = runAgain(options); long processedMessages2 = res.getAggregatorValue("processedMessages", Long.class); assertThat(String.format("Expected %d processed messages count but " - + "found %d", EXPECTED_AGG_FIRST, processedMessages2), processedMessages2, - equalTo(EXPECTED_AGG_FIRST)); + + "found %d", EXPECTED_AGG_SECOND, processedMessages2), processedMessages2, + equalTo(EXPECTED_AGG_SECOND)); } private SparkPipelineResult runAgain(SparkPipelineOptions options) {