[BEAM-2670] Fixes SparkRuntimeContext.getPipelineOptions() It used a global variable to store the deserialized options, so even if there were several instances of SparkRuntimeContext created with different PipelineOptions, they would all return the same value depending on which one was asked first.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ff4b36c8 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ff4b36c8 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ff4b36c8 Branch: refs/heads/master Commit: ff4b36c8ae1bd5e436ad63a32997273c8b4a97fe Parents: 0a358c7 Author: Eugene Kirpichov <kirpic...@google.com> Authored: Thu Jul 27 13:05:23 2017 -0700 Committer: Eugene Kirpichov <kirpic...@google.com> Committed: Wed Aug 2 11:04:50 2017 -0700 ---------------------------------------------------------------------- .../spark/translation/EvaluationContext.java | 2 +- .../spark/translation/SparkRuntimeContext.java | 48 ++++++++------------ .../translation/SparkRuntimeContextTest.java | 2 +- 3 files changed, 22 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/ff4b36c8/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java index 0c6c4d1..23e430a 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java @@ -65,7 +65,7 @@ public class EvaluationContext { this.jsc = jsc; this.pipeline = pipeline; this.options = options; - this.runtime = new SparkRuntimeContext(pipeline, options); + this.runtime = new SparkRuntimeContext(options); } public EvaluationContext( http://git-wip-us.apache.org/repos/asf/beam/blob/ff4b36c8/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java index f3fe99c..6361bb2 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java @@ -21,11 +21,12 @@ package org.apache.beam.runners.spark.translation; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.Module; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Function; +import com.google.common.base.Supplier; +import com.google.common.base.Suppliers; import java.io.IOException; import java.io.Serializable; -import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.CoderRegistry; -import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.util.common.ReflectHelpers; @@ -34,11 +35,16 @@ import org.apache.beam.sdk.util.common.ReflectHelpers; * data flow program is launched. */ public class SparkRuntimeContext implements Serializable { - private final String serializedPipelineOptions; + private final Supplier<PipelineOptions> optionsSupplier; private transient CoderRegistry coderRegistry; - SparkRuntimeContext(Pipeline pipeline, PipelineOptions options) { - this.serializedPipelineOptions = serializePipelineOptions(options); + SparkRuntimeContext(PipelineOptions options) { + String serializedPipelineOptions = serializePipelineOptions(options); + this.optionsSupplier = + Suppliers.memoize( + Suppliers.compose( + new DeserializeOptions(), + Suppliers.ofInstance(serializedPipelineOptions))); } /** @@ -59,16 +65,8 @@ public class SparkRuntimeContext implements Serializable { } } - private static PipelineOptions deserializePipelineOptions(String serializedPipelineOptions) { - try { - return createMapper().readValue(serializedPipelineOptions, PipelineOptions.class); - } catch (IOException e) { - throw new IllegalStateException("Failed to deserialize the pipeline options.", e); - } - } - public PipelineOptions getPipelineOptions() { - return PipelineOptionsHolder.getOrInit(serializedPipelineOptions); + return optionsSupplier.get(); } public CoderRegistry getCoderRegistry() { @@ -78,21 +76,15 @@ public class SparkRuntimeContext implements Serializable { return coderRegistry; } - private static class PipelineOptionsHolder { - // on executors, this should deserialize once. - private static transient volatile PipelineOptions pipelineOptions = null; - - static PipelineOptions getOrInit(String serializedPipelineOptions) { - if (pipelineOptions == null) { - synchronized (PipelineOptionsHolder.class) { - if (pipelineOptions == null) { - pipelineOptions = deserializePipelineOptions(serializedPipelineOptions); - } - } - // Register standard FileSystems. - FileSystems.setDefaultPipelineOptions(pipelineOptions); + private static class DeserializeOptions + implements Function<String, PipelineOptions>, Serializable { + @Override + public PipelineOptions apply(String options) { + try { + return createMapper().readValue(options, PipelineOptions.class); + } catch (IOException e) { + throw new IllegalStateException("Failed to deserialize the pipeline options.", e); } - return pipelineOptions; } } } http://git-wip-us.apache.org/repos/asf/beam/blob/ff4b36c8/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkRuntimeContextTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkRuntimeContextTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkRuntimeContextTest.java index e8f578a..456056a 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkRuntimeContextTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkRuntimeContextTest.java @@ -105,7 +105,7 @@ public class SparkRuntimeContextTest { .as(JacksonIncompatibleOptions.class); options.setRunner(CrashingRunner.class); Pipeline p = Pipeline.create(options); - SparkRuntimeContext context = new SparkRuntimeContext(p, options); + SparkRuntimeContext context = new SparkRuntimeContext(options); ByteArrayOutputStream baos = new ByteArrayOutputStream(); try (ObjectOutputStream outputStream = new ObjectOutputStream(baos)) {