Repository: beam Updated Branches: refs/heads/master 99056df36 -> 8a33591d9
[BEAM-1827] Fix use of deprecated Spark APIs in the runner. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6671b5b6 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6671b5b6 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6671b5b6 Branch: refs/heads/master Commit: 6671b5b6bae6c2a918481577ca2564bb45e7c280 Parents: 99056df Author: Amit Sela <amitsel...@gmail.com> Authored: Wed Mar 29 10:39:49 2017 +0300 Committer: Amit Sela <amitsel...@gmail.com> Committed: Wed Mar 29 15:56:21 2017 +0300 ---------------------------------------------------------------------- .../beam/runners/spark/SparkPipelineResult.java | 51 +++---- .../apache/beam/runners/spark/SparkRunner.java | 153 +++++++++---------- .../beam/runners/spark/io/SourceDStream.java | 4 +- .../SparkRunnerStreamingContextFactory.java | 43 +++--- 4 files changed, 113 insertions(+), 138 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/6671b5b6/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java index b2b2831..d2c5c8e 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java @@ -38,9 +38,7 @@ import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.joda.time.Duration; -/** - * Represents a Spark pipeline execution result. - */ +/** Represents a Spark pipeline execution result. */ public abstract class SparkPipelineResult implements PipelineResult { protected final Future pipelineExecution; @@ -48,8 +46,7 @@ public abstract class SparkPipelineResult implements PipelineResult { protected PipelineResult.State state; private final SparkMetricResults metricResults = new SparkMetricResults(); - SparkPipelineResult(final Future<?> pipelineExecution, - final JavaSparkContext javaSparkContext) { + SparkPipelineResult(final Future<?> pipelineExecution, final JavaSparkContext javaSparkContext) { this.pipelineExecution = pipelineExecution; this.javaSparkContext = javaSparkContext; // pipelineExecution is expected to have started executing eagerly. @@ -130,13 +127,10 @@ public abstract class SparkPipelineResult implements PipelineResult { return state; } - /** - * Represents the result of running a batch pipeline. - */ + /** Represents the result of running a batch pipeline. */ static class BatchMode extends SparkPipelineResult { - BatchMode(final Future<?> pipelineExecution, - final JavaSparkContext javaSparkContext) { + BatchMode(final Future<?> pipelineExecution, final JavaSparkContext javaSparkContext) { super(pipelineExecution, javaSparkContext); } @@ -156,15 +150,13 @@ public abstract class SparkPipelineResult implements PipelineResult { } } - /** - * Represents a streaming Spark pipeline result. - */ + /** Represents a streaming Spark pipeline result. */ static class StreamingMode extends SparkPipelineResult { private final JavaStreamingContext javaStreamingContext; - StreamingMode(final Future<?> pipelineExecution, - final JavaStreamingContext javaStreamingContext) { + StreamingMode( + final Future<?> pipelineExecution, final JavaStreamingContext javaStreamingContext) { super(pipelineExecution, javaStreamingContext.sparkContext()); this.javaStreamingContext = javaStreamingContext; } @@ -176,7 +168,7 @@ public abstract class SparkPipelineResult implements PipelineResult { // calling the StreamingContext's waiter with 0 msec will throw any error that might have // been thrown during the "grace period". try { - javaStreamingContext.awaitTermination(0); + javaStreamingContext.awaitTerminationOrTimeout(0); } catch (Exception e) { throw beamExceptionFrom(e); } finally { @@ -188,24 +180,24 @@ public abstract class SparkPipelineResult implements PipelineResult { } @Override - protected State awaitTermination(final Duration duration) throws ExecutionException, - InterruptedException { + protected State awaitTermination(final Duration duration) + throws ExecutionException, InterruptedException { pipelineExecution.get(); // execution is asynchronous anyway so no need to time-out. javaStreamingContext.awaitTerminationOrTimeout(duration.getMillis()); State terminationState; switch (javaStreamingContext.getState()) { - case ACTIVE: - terminationState = State.RUNNING; - break; - case STOPPED: - terminationState = State.DONE; - break; - default: - terminationState = State.UNKNOWN; - break; - } - return terminationState; + case ACTIVE: + terminationState = State.RUNNING; + break; + case STOPPED: + terminationState = State.DONE; + break; + default: + terminationState = State.UNKNOWN; + break; + } + return terminationState; } } @@ -216,5 +208,4 @@ public abstract class SparkPipelineResult implements PipelineResult { stop(); } } - } http://git-wip-us.apache.org/repos/asf/beam/blob/6671b5b6/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java index 97532c4..5b4f73e 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java @@ -65,40 +65,32 @@ import org.apache.spark.streaming.api.java.JavaStreamingListenerWrapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - /** - * The SparkRunner translate operations defined on a pipeline to a representation - * executable by Spark, and then submitting the job to Spark to be executed. If we wanted to run - * a Beam pipeline with the default options of a single threaded spark instance in local mode, - * we would do the following: + * The SparkRunner translate operations defined on a pipeline to a representation executable by + * Spark, and then submitting the job to Spark to be executed. If we wanted to run a Beam pipeline + * with the default options of a single threaded spark instance in local mode, we would do the + * following: * - * {@code - * Pipeline p = [logic for pipeline creation] - * SparkPipelineResult result = (SparkPipelineResult) p.run(); - * } + * <p>{@code Pipeline p = [logic for pipeline creation] SparkPipelineResult result = + * (SparkPipelineResult) p.run(); } * * <p>To create a pipeline runner to run against a different spark cluster, with a custom master url * we would do the following: * - * {@code - * Pipeline p = [logic for pipeline creation] - * SparkPipelineOptions options = SparkPipelineOptionsFactory.create(); - * options.setSparkMaster("spark://host:port"); - * SparkPipelineResult result = (SparkPipelineResult) p.run(); - * } + * <p>{@code Pipeline p = [logic for pipeline creation] SparkPipelineOptions options = + * SparkPipelineOptionsFactory.create(); options.setSparkMaster("spark://host:port"); + * SparkPipelineResult result = (SparkPipelineResult) p.run(); } */ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> { private static final Logger LOG = LoggerFactory.getLogger(SparkRunner.class); - /** - * Options used in this pipeline runner. - */ + /** Options used in this pipeline runner. */ private final SparkPipelineOptions mOptions; /** - * Creates and returns a new SparkRunner with default options. In particular, against a - * spark instance running in local mode. + * Creates and returns a new SparkRunner with default options. In particular, against a spark + * instance running in local mode. * * @return A pipeline runner with default options. */ @@ -156,11 +148,11 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> { if (mOptions.isStreaming()) { CheckpointDir checkpointDir = new CheckpointDir(mOptions.getCheckpointDir()); - final SparkRunnerStreamingContextFactory contextFactory = + SparkRunnerStreamingContextFactory streamingContextFactory = new SparkRunnerStreamingContextFactory(pipeline, mOptions, checkpointDir); final JavaStreamingContext jssc = - JavaStreamingContext.getOrCreate(checkpointDir.getSparkCheckpointDir().toString(), - contextFactory); + JavaStreamingContext.getOrCreate( + checkpointDir.getSparkCheckpointDir().toString(), streamingContextFactory); // Checkpoint aggregator/metrics values jssc.addStreamingListener( @@ -171,7 +163,7 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> { new MetricsAccumulator.AccumulatorCheckpointingSparkListener())); // register user-defined listeners. - for (JavaStreamingListener listener: mOptions.as(SparkContextOptions.class).getListeners()) { + for (JavaStreamingListener listener : mOptions.as(SparkContextOptions.class).getListeners()) { LOG.info("Registered listener {}." + listener.getClass().getSimpleName()); jssc.addStreamingListener(new JavaStreamingListenerWrapper(listener)); } @@ -185,14 +177,16 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> { // but this is fine since it is idempotent). initAccumulators(mOptions, jssc.sparkContext()); - startPipeline = executorService.submit(new Runnable() { + startPipeline = + executorService.submit( + new Runnable() { - @Override - public void run() { - LOG.info("Starting streaming pipeline execution."); - jssc.start(); - } - }); + @Override + public void run() { + LOG.info("Starting streaming pipeline execution."); + jssc.start(); + } + }); result = new SparkPipelineResult.StreamingMode(startPipeline, jssc); } else { @@ -206,15 +200,17 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> { initAccumulators(mOptions, jsc); - startPipeline = executorService.submit(new Runnable() { + startPipeline = + executorService.submit( + new Runnable() { - @Override - public void run() { - pipeline.traverseTopologically(new Evaluator(translator, evaluationContext)); - evaluationContext.computeOutputs(); - LOG.info("Batch pipeline execution complete."); - } - }); + @Override + public void run() { + pipeline.traverseTopologically(new Evaluator(translator, evaluationContext)); + evaluationContext.computeOutputs(); + LOG.info("Batch pipeline execution complete."); + } + }); result = new SparkPipelineResult.BatchMode(startPipeline, jsc); } @@ -227,30 +223,28 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> { } private void registerMetricsSource(String appName) { - final MetricsSystem metricsSystem = SparkEnv$.MODULE$.get().metricsSystem(); - final AggregatorMetricSource aggregatorMetricSource = - new AggregatorMetricSource(null, AggregatorsAccumulator.getInstance().value()); - final SparkBeamMetricSource metricsSource = new SparkBeamMetricSource(null); - final CompositeSource compositeSource = - new CompositeSource(appName + ".Beam", metricsSource.metricRegistry(), - aggregatorMetricSource.metricRegistry()); - // re-register the metrics in case of context re-use - metricsSystem.removeSource(compositeSource); - metricsSystem.registerSource(compositeSource); + final MetricsSystem metricsSystem = SparkEnv$.MODULE$.get().metricsSystem(); + final AggregatorMetricSource aggregatorMetricSource = + new AggregatorMetricSource(null, AggregatorsAccumulator.getInstance().value()); + final SparkBeamMetricSource metricsSource = new SparkBeamMetricSource(null); + final CompositeSource compositeSource = + new CompositeSource( + appName + ".Beam", + metricsSource.metricRegistry(), + aggregatorMetricSource.metricRegistry()); + // re-register the metrics in case of context re-use + metricsSystem.removeSource(compositeSource); + metricsSystem.registerSource(compositeSource); } - /** - * Init Metrics/Aggregators accumulators. This method is idempotent. - */ + /** Init Metrics/Aggregators accumulators. This method is idempotent. */ public static void initAccumulators(SparkPipelineOptions opts, JavaSparkContext jsc) { // Init metrics accumulators MetricsAccumulator.init(opts, jsc); AggregatorsAccumulator.init(opts, jsc); } - /** - * Visit the pipeline to determine the translation mode (batch/streaming). - */ + /** Visit the pipeline to determine the translation mode (batch/streaming). */ private void detectTranslationMode(Pipeline pipeline) { TranslationModeDetector detector = new TranslationModeDetector(); pipeline.traverseTopologically(detector); @@ -260,20 +254,14 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> { } } - /** - * Evaluator that update/populate the cache candidates. - */ + /** Evaluator that update/populate the cache candidates. */ public static void updateCacheCandidates( - Pipeline pipeline, - SparkPipelineTranslator translator, - EvaluationContext evaluationContext) { - CacheVisitor cacheVisitor = new CacheVisitor(translator, evaluationContext); - pipeline.traverseTopologically(cacheVisitor); + Pipeline pipeline, SparkPipelineTranslator translator, EvaluationContext evaluationContext) { + CacheVisitor cacheVisitor = new CacheVisitor(translator, evaluationContext); + pipeline.traverseTopologically(cacheVisitor); } - /** - * The translation mode of the Beam Pipeline. - */ + /** The translation mode of the Beam Pipeline. */ enum TranslationMode { /** Uses the batch mode. */ BATCH, @@ -281,9 +269,7 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> { STREAMING } - /** - * Traverses the Pipeline to determine the {@link TranslationMode} for this pipeline. - */ + /** Traverses the Pipeline to determine the {@link TranslationMode} for this pipeline. */ private static class TranslationModeDetector extends Pipeline.PipelineVisitor.Defaults { private static final Logger LOG = LoggerFactory.getLogger(TranslationModeDetector.class); private static final Collection<Class<? extends PTransform>> UNBOUNDED_INPUTS = @@ -315,14 +301,11 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> { } } - /** - * Traverses the pipeline to populate the candidates for caching. - */ + /** Traverses the pipeline to populate the candidates for caching. */ static class CacheVisitor extends Evaluator { protected CacheVisitor( - SparkPipelineTranslator translator, - EvaluationContext evaluationContext) { + SparkPipelineTranslator translator, EvaluationContext evaluationContext) { super(translator, evaluationContext); } @@ -345,9 +328,7 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> { } } - /** - * Evaluator on the pipeline. - */ + /** Evaluator on the pipeline. */ @SuppressWarnings("WeakerAccess") public static class Evaluator extends Pipeline.PipelineVisitor.Defaults { private static final Logger LOG = LoggerFactory.getLogger(Evaluator.class); @@ -399,7 +380,9 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> { } // defer if sideInputs are defined. if (hasSideInput) { - LOG.info("Deferring combine transformation {} for job {}", transform, + LOG.info( + "Deferring combine transformation {} for job {}", + transform, ctxt.getPipeline().getOptions().getJobName()); return true; } @@ -412,14 +395,14 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> { doVisitTransform(node); } - <TransformT extends PTransform<? super PInput, POutput>> void - doVisitTransform(TransformHierarchy.Node node) { + <TransformT extends PTransform<? super PInput, POutput>> void doVisitTransform( + TransformHierarchy.Node node) { @SuppressWarnings("unchecked") TransformT transform = (TransformT) node.getTransform(); @SuppressWarnings("unchecked") Class<TransformT> transformClass = (Class<TransformT>) (Class<?>) transform.getClass(); - @SuppressWarnings("unchecked") TransformEvaluator<TransformT> evaluator = - translate(node, transform, transformClass); + @SuppressWarnings("unchecked") + TransformEvaluator<TransformT> evaluator = translate(node, transform, transformClass); LOG.info("Evaluating {}", transform); AppliedPTransform<?, ?, ?> appliedTransform = node.toAppliedPTransform(); ctxt.setCurrentTransform(appliedTransform); @@ -432,7 +415,7 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> { * translate with the proper translator. */ protected <TransformT extends PTransform<? super PInput, POutput>> - TransformEvaluator<TransformT> translate( + TransformEvaluator<TransformT> translate( TransformHierarchy.Node node, TransformT transform, Class<TransformT> transformClass) { //--- determine if node is bounded/unbounded. // usually, the input determines if the PCollection to apply the next transformation to @@ -449,7 +432,7 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> { LOG.debug("Translating {} as {}", transform, isNodeBounded); return isNodeBounded.equals(PCollection.IsBounded.BOUNDED) ? translator.translateBounded(transformClass) - : translator.translateUnbounded(transformClass); + : translator.translateUnbounded(transformClass); } protected PCollection.IsBounded isBoundedCollection(Collection<TaggedPValue> pValues) { @@ -458,7 +441,7 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> { // BOUNDED behaves as the Identity Element, BOUNDED + BOUNDED = BOUNDED // while BOUNDED + UNBOUNDED = UNBOUNDED. PCollection.IsBounded isBounded = PCollection.IsBounded.BOUNDED; - for (TaggedPValue pValue: pValues) { + for (TaggedPValue pValue : pValues) { if (pValue.getValue() instanceof PCollection) { isBounded = isBounded.and(((PCollection) pValue.getValue()).isBounded()); } else { http://git-wip-us.apache.org/repos/asf/beam/blob/6671b5b6/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java index 3f2c10a..b7bfeed 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java @@ -87,7 +87,7 @@ class SourceDStream<T, CheckpointMarkT extends UnboundedSource.CheckpointMark> this.boundReadDuration = boundReadDuration(options.getReadTimePercentage(), options.getMinReadTimeMillis()); // set initial parallelism once. - this.initialParallelism = ssc().sc().defaultParallelism(); + this.initialParallelism = ssc().sparkContext().defaultParallelism(); checkArgument(this.initialParallelism > 0, "Number of partitions must be greater than zero."); this.boundMaxRecords = boundMaxRecords > 0 ? boundMaxRecords : rateControlledMaxRecords(); @@ -106,7 +106,7 @@ class SourceDStream<T, CheckpointMarkT extends UnboundedSource.CheckpointMark> public scala.Option<RDD<Tuple2<Source<T>, CheckpointMarkT>>> compute(Time validTime) { RDD<scala.Tuple2<Source<T>, CheckpointMarkT>> rdd = new SourceRDD.Unbounded<>( - ssc().sc(), + ssc().sparkContext(), runtimeContext, createMicrobatchSource(), numPartitions); http://git-wip-us.apache.org/repos/asf/beam/blob/6671b5b6/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java index 98521e9..2dd18f3 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java @@ -32,47 +32,47 @@ import org.apache.beam.sdk.Pipeline; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.Function0; import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.api.java.JavaStreamingContext; -import org.apache.spark.streaming.api.java.JavaStreamingContextFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - /** * A {@link JavaStreamingContext} factory for resilience. - * @see <a href="https://spark.apache.org/docs/1.6.3/streaming-programming-guide.html#how-to-configure-checkpointing">how-to-configure-checkpointing</a> + * + * @see <a + * href="https://spark.apache.org/docs/1.6.3/streaming-programming-guide.html#how-to-configure-checkpointing">how-to-configure-checkpointing</a> */ -public class SparkRunnerStreamingContextFactory implements JavaStreamingContextFactory { +public class SparkRunnerStreamingContextFactory implements Function0<JavaStreamingContext> { private static final Logger LOG = LoggerFactory.getLogger(SparkRunnerStreamingContextFactory.class); - private final Pipeline pipeline; - private final SparkPipelineOptions options; - private final CheckpointDir checkpointDir; + // set members as transient to satisfy findbugs and since this only runs in driver. + private final transient Pipeline pipeline; + private final transient SparkPipelineOptions options; + private final transient CheckpointDir checkpointDir; public SparkRunnerStreamingContextFactory( - Pipeline pipeline, - SparkPipelineOptions options, - CheckpointDir checkpointDir) { + Pipeline pipeline, SparkPipelineOptions options, CheckpointDir checkpointDir) { this.pipeline = pipeline; this.options = options; this.checkpointDir = checkpointDir; } - private EvaluationContext ctxt; - @Override - public JavaStreamingContext create() { + public JavaStreamingContext call() throws Exception { LOG.info("Creating a new Spark Streaming Context"); // validate unbounded read properties. - checkArgument(options.getMinReadTimeMillis() < options.getBatchIntervalMillis(), + checkArgument( + options.getMinReadTimeMillis() < options.getBatchIntervalMillis(), "Minimum read time has to be less than batch time."); - checkArgument(options.getReadTimePercentage() > 0 && options.getReadTimePercentage() < 1, + checkArgument( + options.getReadTimePercentage() > 0 && options.getReadTimePercentage() < 1, "Read time percentage is bound to (0, 1)."); - SparkPipelineTranslator translator = new StreamingTransformTranslator.Translator( - new TransformTranslator.Translator()); + SparkPipelineTranslator translator = + new StreamingTransformTranslator.Translator(new TransformTranslator.Translator()); Duration batchDuration = new Duration(options.getBatchIntervalMillis()); LOG.info("Setting Spark streaming batchDuration to {} msec", batchDuration.milliseconds()); @@ -82,24 +82,25 @@ public class SparkRunnerStreamingContextFactory implements JavaStreamingContextF // We must first init accumulators since translators expect them to be instantiated. SparkRunner.initAccumulators(options, jsc); - ctxt = new EvaluationContext(jsc, pipeline, jssc); + EvaluationContext ctxt = new EvaluationContext(jsc, pipeline, jssc); // update cache candidates SparkRunner.updateCacheCandidates(pipeline, translator, ctxt); pipeline.traverseTopologically(new SparkRunner.Evaluator(translator, ctxt)); ctxt.computeOutputs(); - checkpoint(jssc); + checkpoint(jssc, checkpointDir); return jssc; } - private void checkpoint(JavaStreamingContext jssc) { + private void checkpoint(JavaStreamingContext jssc, CheckpointDir checkpointDir) { Path rootCheckpointPath = checkpointDir.getRootCheckpointDir(); Path sparkCheckpointPath = checkpointDir.getSparkCheckpointDir(); Path beamCheckpointPath = checkpointDir.getBeamCheckpointDir(); try { - FileSystem fileSystem = rootCheckpointPath.getFileSystem(jssc.sc().hadoopConfiguration()); + FileSystem fileSystem = + rootCheckpointPath.getFileSystem(jssc.sparkContext().hadoopConfiguration()); if (!fileSystem.exists(rootCheckpointPath)) { fileSystem.mkdirs(rootCheckpointPath); }