Repository: beam Updated Branches: refs/heads/master 410534b1f -> 1fd52f53c
[BEAM-1556] Make PipelineOptions a lazy-singleton and init IOs as part of it. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4dda585c Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4dda585c Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4dda585c Branch: refs/heads/master Commit: 4dda585cda61a775e2d616fa5c25698f490b9cd3 Parents: 410534b Author: Sela <ans...@paypal.com> Authored: Mon Mar 6 11:17:00 2017 +0200 Committer: Davor Bonaci <da...@google.com> Committed: Mon Mar 6 22:37:41 2017 -0800 ---------------------------------------------------------------------- .../spark/translation/SparkRuntimeContext.java | 29 ++++++++++++++++---- 1 file changed, 23 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/4dda585c/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java index 9c3d79f..4ccfead 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java @@ -32,6 +32,7 @@ import org.apache.beam.sdk.coders.CoderRegistry; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.util.IOChannelUtils; import org.apache.spark.Accumulator; /** @@ -40,12 +41,10 @@ import org.apache.spark.Accumulator; */ public class SparkRuntimeContext implements Serializable { private final String serializedPipelineOptions; + private transient CoderRegistry coderRegistry; - /** - * Map fo names to Beam aggregators. - */ + // map for names to Beam aggregators. private final Map<String, Aggregator<?, ?>> aggregators = new HashMap<>(); - private transient CoderRegistry coderRegistry; SparkRuntimeContext(Pipeline pipeline) { this.serializedPipelineOptions = serializePipelineOptions(pipeline.getOptions()); @@ -67,8 +66,8 @@ public class SparkRuntimeContext implements Serializable { } } - public synchronized PipelineOptions getPipelineOptions() { - return deserializePipelineOptions(serializedPipelineOptions); + public PipelineOptions getPipelineOptions() { + return PipelineOptionsHolder.getOrInit(serializedPipelineOptions); } /** @@ -118,6 +117,24 @@ public class SparkRuntimeContext implements Serializable { return coderRegistry; } + private static class PipelineOptionsHolder { + // on executors, this should deserialize once. + private static transient volatile PipelineOptions pipelineOptions = null; + + static PipelineOptions getOrInit(String serializedPipelineOptions) { + if (pipelineOptions == null) { + synchronized (PipelineOptionsHolder.class) { + if (pipelineOptions == null) { + pipelineOptions = deserializePipelineOptions(serializedPipelineOptions); + } + } + // register IO factories. + IOChannelUtils.registerIOFactoriesAllowOverride(pipelineOptions); + } + return pipelineOptions; + } + } + /** * Initialize spark aggregators exactly once. *