Repository: beam
Updated Branches:
  refs/heads/master 0a358c780 -> 339976c9f


http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
index 1385e07..1263618 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
@@ -31,6 +31,7 @@ import org.apache.beam.runners.core.ReduceFnRunner;
 import org.apache.beam.runners.core.SystemReduceFn;
 import org.apache.beam.runners.core.TimerInternals;
 import org.apache.beam.runners.core.UnsupportedSideInputReader;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
 import org.apache.beam.runners.core.construction.TriggerTranslation;
 import org.apache.beam.runners.core.metrics.CounterCell;
 import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
@@ -38,7 +39,6 @@ import 
org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
 import org.apache.beam.runners.core.triggers.TriggerStateMachines;
 import org.apache.beam.runners.spark.SparkPipelineOptions;
 import org.apache.beam.runners.spark.coders.CoderHelpers;
-import org.apache.beam.runners.spark.translation.SparkRuntimeContext;
 import org.apache.beam.runners.spark.translation.TranslationUtils;
 import org.apache.beam.runners.spark.translation.WindowingHelpers;
 import org.apache.beam.runners.spark.util.ByteArray;
@@ -108,11 +108,11 @@ public class SparkGroupAlsoByWindowViaWindowSet {
           final Coder<K> keyCoder,
           final Coder<WindowedValue<InputT>> wvCoder,
           final WindowingStrategy<?, W> windowingStrategy,
-          final SparkRuntimeContext runtimeContext,
+          final SerializablePipelineOptions options,
           final List<Integer> sourceIds) {
 
     final long batchDurationMillis =
-        
runtimeContext.getPipelineOptions().as(SparkPipelineOptions.class).getBatchIntervalMillis();
+        options.get().as(SparkPipelineOptions.class).getBatchIntervalMillis();
     final IterableCoder<WindowedValue<InputT>> itrWvCoder = 
IterableCoder.of(wvCoder);
     final Coder<InputT> iCoder = ((FullWindowedValueCoder<InputT>) 
wvCoder).getValueCoder();
     final Coder<? extends BoundedWindow> wCoder =
@@ -123,7 +123,7 @@ public class SparkGroupAlsoByWindowViaWindowSet {
         
TimerInternals.TimerDataCoder.of(windowingStrategy.getWindowFn().windowCoder());
 
     long checkpointDurationMillis =
-        runtimeContext.getPipelineOptions().as(SparkPipelineOptions.class)
+        options.get().as(SparkPipelineOptions.class)
             .getCheckpointDurationMillis();
 
     // we have to switch to Scala API to avoid Optional in the Java API, see: 
SPARK-4819.
@@ -268,7 +268,7 @@ public class SparkGroupAlsoByWindowViaWindowSet {
                               outputHolder,
                               new 
UnsupportedSideInputReader("GroupAlsoByWindow"),
                               reduceFn,
-                              runtimeContext.getPipelineOptions());
+                              options.get());
 
                       outputHolder.clear(); // clear before potential use.
                       if (!seq.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java
index 549bd30..1b54478 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java
@@ -27,12 +27,12 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
 import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
 import org.apache.beam.runners.spark.coders.CoderHelpers;
 import org.apache.beam.runners.spark.io.EmptyCheckpointMark;
 import org.apache.beam.runners.spark.io.MicrobatchSource;
 import org.apache.beam.runners.spark.io.SparkUnboundedSource.Metadata;
-import org.apache.beam.runners.spark.translation.SparkRuntimeContext;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.io.Source;
 import org.apache.beam.sdk.io.UnboundedSource;
@@ -91,7 +91,7 @@ public class StateSpecFunctions {
    *
    * <p>See also <a 
href="https://issues.apache.org/jira/browse/SPARK-4819";>SPARK-4819</a>.</p>
    *
-   * @param runtimeContext    A serializable {@link SparkRuntimeContext}.
+   * @param options           A serializable {@link 
SerializablePipelineOptions}.
    * @param <T>               The type of the input stream elements.
    * @param <CheckpointMarkT> The type of the {@link 
UnboundedSource.CheckpointMark}.
    * @return The appropriate {@link org.apache.spark.streaming.StateSpec} 
function.
@@ -99,7 +99,7 @@ public class StateSpecFunctions {
   public static <T, CheckpointMarkT extends UnboundedSource.CheckpointMark>
   scala.Function3<Source<T>, scala.Option<CheckpointMarkT>, 
State<Tuple2<byte[], Instant>>,
       Tuple2<Iterable<byte[]>, Metadata>> mapSourceFunction(
-           final SparkRuntimeContext runtimeContext, final String stepName) {
+      final SerializablePipelineOptions options, final String stepName) {
 
     return new SerializableFunction3<Source<T>, Option<CheckpointMarkT>,
         State<Tuple2<byte[], Instant>>, Tuple2<Iterable<byte[]>, Metadata>>() {
@@ -151,7 +151,7 @@ public class StateSpecFunctions {
         try {
           microbatchReader =
               (MicrobatchSource.Reader)
-                  
microbatchSource.getOrCreateReader(runtimeContext.getPipelineOptions(),
+                  microbatchSource.getOrCreateReader(options.get(),
                                                      checkpointMark);
         } catch (IOException e) {
           throw new RuntimeException(e);

http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
index 23e430a..463e507 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
@@ -26,6 +26,7 @@ import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
 import java.util.Map;
 import java.util.Set;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
 import org.apache.beam.runners.core.construction.TransformInputs;
 import org.apache.beam.runners.spark.SparkPipelineOptions;
 import org.apache.beam.runners.spark.coders.CoderHelpers;
@@ -50,7 +51,6 @@ import 
org.apache.spark.streaming.api.java.JavaStreamingContext;
 public class EvaluationContext {
   private final JavaSparkContext jsc;
   private JavaStreamingContext jssc;
-  private final SparkRuntimeContext runtime;
   private final Pipeline pipeline;
   private final Map<PValue, Dataset> datasets = new LinkedHashMap<>();
   private final Map<PValue, Dataset> pcollections = new LinkedHashMap<>();
@@ -60,12 +60,13 @@ public class EvaluationContext {
   private final SparkPCollectionView pviews = new SparkPCollectionView();
   private final Map<PCollection, Long> cacheCandidates = new HashMap<>();
   private final PipelineOptions options;
+  private final SerializablePipelineOptions serializableOptions;
 
   public EvaluationContext(JavaSparkContext jsc, Pipeline pipeline, 
PipelineOptions options) {
     this.jsc = jsc;
     this.pipeline = pipeline;
     this.options = options;
-    this.runtime = new SparkRuntimeContext(options);
+    this.serializableOptions = new SerializablePipelineOptions(options);
   }
 
   public EvaluationContext(
@@ -90,8 +91,8 @@ public class EvaluationContext {
     return options;
   }
 
-  public SparkRuntimeContext getRuntimeContext() {
-    return runtime;
+  public SerializablePipelineOptions getSerializableOptions() {
+    return serializableOptions;
   }
 
   public void setCurrentTransform(AppliedPTransform<?, ?, ?> transform) {
@@ -254,7 +255,7 @@ public class EvaluationContext {
   }
 
   private String storageLevel() {
-    return 
runtime.getPipelineOptions().as(SparkPipelineOptions.class).getStorageLevel();
+    return 
serializableOptions.get().as(SparkPipelineOptions.class).getStorageLevel();
   }
 
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
index 23d5b32..7299583 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
@@ -34,8 +34,8 @@ import org.apache.beam.runners.core.InMemoryTimerInternals;
 import org.apache.beam.runners.core.StateInternals;
 import org.apache.beam.runners.core.StepContext;
 import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
 import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
-import org.apache.beam.runners.spark.aggregators.NamedAggregators;
 import org.apache.beam.runners.spark.util.SideInputBroadcast;
 import org.apache.beam.runners.spark.util.SparkSideInputReader;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -59,11 +59,10 @@ import scala.Tuple2;
 public class MultiDoFnFunction<InputT, OutputT>
     implements PairFlatMapFunction<Iterator<WindowedValue<InputT>>, 
TupleTag<?>, WindowedValue<?>> {
 
-  private final Accumulator<NamedAggregators> aggAccum;
   private final Accumulator<MetricsContainerStepMap> metricsAccum;
   private final String stepName;
   private final DoFn<InputT, OutputT> doFn;
-  private final SparkRuntimeContext runtimeContext;
+  private final SerializablePipelineOptions options;
   private final TupleTag<OutputT> mainOutputTag;
   private final List<TupleTag<?>> additionalOutputTags;
   private final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, 
SideInputBroadcast<?>>> sideInputs;
@@ -71,10 +70,9 @@ public class MultiDoFnFunction<InputT, OutputT>
   private final boolean stateful;
 
   /**
-   * @param aggAccum       The Spark {@link Accumulator} that backs the Beam 
Aggregators.
    * @param metricsAccum       The Spark {@link Accumulator} that backs the 
Beam metrics.
    * @param doFn              The {@link DoFn} to be wrapped.
-   * @param runtimeContext    The {@link SparkRuntimeContext}.
+   * @param options    The {@link SerializablePipelineOptions}.
    * @param mainOutputTag     The main output {@link TupleTag}.
    * @param additionalOutputTags Additional {@link TupleTag output tags}.
    * @param sideInputs        Side inputs used in this {@link DoFn}.
@@ -82,21 +80,19 @@ public class MultiDoFnFunction<InputT, OutputT>
    * @param stateful          Stateful {@link DoFn}.
    */
   public MultiDoFnFunction(
-      Accumulator<NamedAggregators> aggAccum,
       Accumulator<MetricsContainerStepMap> metricsAccum,
       String stepName,
       DoFn<InputT, OutputT> doFn,
-      SparkRuntimeContext runtimeContext,
+      SerializablePipelineOptions options,
       TupleTag<OutputT> mainOutputTag,
       List<TupleTag<?>> additionalOutputTags,
       Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> 
sideInputs,
       WindowingStrategy<?, ?> windowingStrategy,
       boolean stateful) {
-    this.aggAccum = aggAccum;
     this.metricsAccum = metricsAccum;
     this.stepName = stepName;
     this.doFn = doFn;
-    this.runtimeContext = runtimeContext;
+    this.options = options;
     this.mainOutputTag = mainOutputTag;
     this.additionalOutputTags = additionalOutputTags;
     this.sideInputs = sideInputs;
@@ -140,7 +136,7 @@ public class MultiDoFnFunction<InputT, OutputT>
 
     final DoFnRunner<InputT, OutputT> doFnRunner =
         DoFnRunners.simpleRunner(
-            runtimeContext.getPipelineOptions(),
+            options.get(),
             doFn,
             new SparkSideInputReader(sideInputs),
             outputManager,

http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAbstractCombineFn.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAbstractCombineFn.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAbstractCombineFn.java
index 315f7fb..d8d71ff 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAbstractCombineFn.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAbstractCombineFn.java
@@ -30,6 +30,7 @@ import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 import org.apache.beam.runners.core.SideInputReader;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
 import org.apache.beam.runners.spark.util.SideInputBroadcast;
 import org.apache.beam.runners.spark.util.SparkSideInputReader;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -48,16 +49,16 @@ import org.apache.beam.sdk.values.WindowingStrategy;
  * {@link org.apache.beam.sdk.transforms.Combine.CombineFn}.
  */
 public class SparkAbstractCombineFn implements Serializable {
-  protected final SparkRuntimeContext runtimeContext;
+  protected final SerializablePipelineOptions options;
   protected final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, 
SideInputBroadcast<?>>> sideInputs;
   protected final WindowingStrategy<?, BoundedWindow> windowingStrategy;
 
 
   public SparkAbstractCombineFn(
-      SparkRuntimeContext runtimeContext,
+      SerializablePipelineOptions options,
       Map<TupleTag<?>,  KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> 
sideInputs,
       WindowingStrategy<?, ?> windowingStrategy) {
-    this.runtimeContext = runtimeContext;
+    this.options = options;
     this.sideInputs = sideInputs;
     this.windowingStrategy = (WindowingStrategy<?, BoundedWindow>) 
windowingStrategy;
   }
@@ -71,7 +72,7 @@ public class SparkAbstractCombineFn implements Serializable {
   private transient SparkCombineContext combineContext;
   protected SparkCombineContext ctxtForInput(WindowedValue<?> input) {
     if (combineContext == null) {
-      combineContext = new 
SparkCombineContext(runtimeContext.getPipelineOptions(),
+      combineContext = new SparkCombineContext(options.get(),
           new SparkSideInputReader(sideInputs));
     }
     return combineContext.forInput(input);

http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGlobalCombineFn.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGlobalCombineFn.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGlobalCombineFn.java
index d0e9038..81416a3 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGlobalCombineFn.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGlobalCombineFn.java
@@ -25,6 +25,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import javax.annotation.Nullable;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
 import org.apache.beam.runners.spark.util.SideInputBroadcast;
 import org.apache.beam.sdk.transforms.CombineWithContext;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -49,10 +50,10 @@ public class SparkGlobalCombineFn<InputT, AccumT, OutputT> 
extends SparkAbstract
 
   public SparkGlobalCombineFn(
       CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> 
combineFn,
-      SparkRuntimeContext runtimeContext,
+      SerializablePipelineOptions options,
       Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> 
sideInputs,
       WindowingStrategy<?, ?> windowingStrategy) {
-    super(runtimeContext, sideInputs, windowingStrategy);
+    super(options, sideInputs, windowingStrategy);
     this.combineFn = combineFn;
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java
index d2a3424..fcf438c 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java
@@ -30,6 +30,7 @@ import org.apache.beam.runners.core.StateInternalsFactory;
 import org.apache.beam.runners.core.SystemReduceFn;
 import org.apache.beam.runners.core.TimerInternals;
 import org.apache.beam.runners.core.UnsupportedSideInputReader;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
 import org.apache.beam.runners.core.construction.TriggerTranslation;
 import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
 import org.apache.beam.runners.core.triggers.TriggerStateMachines;
@@ -55,18 +56,18 @@ public class SparkGroupAlsoByWindowViaOutputBufferFn<K, 
InputT, W extends Bounde
   private final WindowingStrategy<?, W> windowingStrategy;
   private final StateInternalsFactory<K> stateInternalsFactory;
   private final SystemReduceFn<K, InputT, Iterable<InputT>, Iterable<InputT>, 
W> reduceFn;
-  private final SparkRuntimeContext runtimeContext;
+  private final SerializablePipelineOptions options;
 
   public SparkGroupAlsoByWindowViaOutputBufferFn(
       WindowingStrategy<?, W> windowingStrategy,
       StateInternalsFactory<K> stateInternalsFactory,
       SystemReduceFn<K, InputT, Iterable<InputT>, Iterable<InputT>, W> 
reduceFn,
-      SparkRuntimeContext runtimeContext,
+      SerializablePipelineOptions options,
       Accumulator<NamedAggregators> accumulator) {
     this.windowingStrategy = windowingStrategy;
     this.stateInternalsFactory = stateInternalsFactory;
     this.reduceFn = reduceFn;
-    this.runtimeContext = runtimeContext;
+    this.options = options;
   }
 
   @Override
@@ -98,7 +99,7 @@ public class SparkGroupAlsoByWindowViaOutputBufferFn<K, 
InputT, W extends Bounde
             outputter,
             new UnsupportedSideInputReader("GroupAlsoByWindow"),
             reduceFn,
-            runtimeContext.getPipelineOptions());
+            options.get());
 
     // Process the grouped values.
     reduceFnRunner.processElements(values);

http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkKeyedCombineFn.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkKeyedCombineFn.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkKeyedCombineFn.java
index 7ac8e7d..55392e9 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkKeyedCombineFn.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkKeyedCombineFn.java
@@ -25,6 +25,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import javax.annotation.Nullable;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
 import org.apache.beam.runners.spark.util.SideInputBroadcast;
 import org.apache.beam.sdk.transforms.CombineWithContext;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -49,10 +50,10 @@ public class SparkKeyedCombineFn<K, InputT, AccumT, 
OutputT> extends SparkAbstra
 
   public SparkKeyedCombineFn(
       CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> 
combineFn,
-      SparkRuntimeContext runtimeContext,
+      SerializablePipelineOptions options,
       Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> 
sideInputs,
       WindowingStrategy<?, ?> windowingStrategy) {
-    super(runtimeContext, sideInputs, windowingStrategy);
+    super(options, sideInputs, windowingStrategy);
     this.combineFn = combineFn;
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
deleted file mode 100644
index 6361bb2..0000000
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.spark.translation;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.Module;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Function;
-import com.google.common.base.Supplier;
-import com.google.common.base.Suppliers;
-import java.io.IOException;
-import java.io.Serializable;
-import org.apache.beam.sdk.coders.CoderRegistry;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.util.common.ReflectHelpers;
-
-/**
- * The SparkRuntimeContext allows us to define useful features on the client 
side before our
- * data flow program is launched.
- */
-public class SparkRuntimeContext implements Serializable {
-  private final Supplier<PipelineOptions> optionsSupplier;
-  private transient CoderRegistry coderRegistry;
-
-  SparkRuntimeContext(PipelineOptions options) {
-    String serializedPipelineOptions = serializePipelineOptions(options);
-    this.optionsSupplier =
-        Suppliers.memoize(
-            Suppliers.compose(
-                new DeserializeOptions(),
-                Suppliers.ofInstance(serializedPipelineOptions)));
-  }
-
-  /**
-   * Use an {@link ObjectMapper} configured with any {@link Module}s in the 
class path allowing
-   * for user specified configuration injection into the ObjectMapper. This 
supports user custom
-   * types on {@link PipelineOptions}.
-   */
-  private static ObjectMapper createMapper() {
-    return new ObjectMapper().registerModules(
-        ObjectMapper.findModules(ReflectHelpers.findClassLoader()));
-  }
-
-  private String serializePipelineOptions(PipelineOptions pipelineOptions) {
-    try {
-      return createMapper().writeValueAsString(pipelineOptions);
-    } catch (JsonProcessingException e) {
-      throw new IllegalStateException("Failed to serialize the pipeline 
options.", e);
-    }
-  }
-
-  public PipelineOptions getPipelineOptions() {
-    return optionsSupplier.get();
-  }
-
-  public CoderRegistry getCoderRegistry() {
-    if (coderRegistry == null) {
-      coderRegistry = CoderRegistry.createDefault();
-    }
-    return coderRegistry;
-  }
-
-  private static class DeserializeOptions
-      implements Function<String, PipelineOptions>, Serializable {
-    @Override
-    public PipelineOptions apply(String options) {
-      try {
-        return createMapper().readValue(options, PipelineOptions.class);
-      } catch (IOException e) {
-        throw new IllegalStateException("Failed to deserialize the pipeline 
options.", e);
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
index ac5e0cd..e060e1d 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
@@ -146,7 +146,7 @@ public final class TransformTranslator {
                 windowingStrategy,
                 new TranslationUtils.InMemoryStateInternalsFactory<K>(),
                 SystemReduceFn.<K, V, W>buffering(coder.getValueCoder()),
-                context.getRuntimeContext(),
+                context.getSerializableOptions(),
                 accum));
 
         context.putDataset(transform, new 
BoundedDataset<>(groupedAlsoByWindow));
@@ -171,7 +171,7 @@ public final class TransformTranslator {
                   (CombineWithContext.CombineFnWithContext<InputT, ?, OutputT>)
                       CombineFnUtil.toFnWithContext(transform.getFn());
               final SparkKeyedCombineFn<K, InputT, ?, OutputT> sparkCombineFn =
-                  new SparkKeyedCombineFn<>(combineFn, 
context.getRuntimeContext(),
+                  new SparkKeyedCombineFn<>(combineFn, 
context.getSerializableOptions(),
                       
TranslationUtils.getSideInputs(transform.getSideInputs(), context),
                           context.getInput(transform).getWindowingStrategy());
 
@@ -222,18 +222,18 @@ public final class TransformTranslator {
             final WindowedValue.FullWindowedValueCoder<OutputT> wvoCoder =
                 WindowedValue.FullWindowedValueCoder.of(oCoder,
                     windowingStrategy.getWindowFn().windowCoder());
-            final SparkRuntimeContext runtimeContext = 
context.getRuntimeContext();
             final boolean hasDefault = transform.isInsertDefault();
 
             final SparkGlobalCombineFn<InputT, AccumT, OutputT> sparkCombineFn 
=
                 new SparkGlobalCombineFn<>(
                     combineFn,
-                    runtimeContext,
+                    context.getSerializableOptions(),
                     TranslationUtils.getSideInputs(transform.getSideInputs(), 
context),
                     windowingStrategy);
             final Coder<AccumT> aCoder;
             try {
-              aCoder = 
combineFn.getAccumulatorCoder(runtimeContext.getCoderRegistry(), iCoder);
+              aCoder = combineFn.getAccumulatorCoder(
+                  context.getPipeline().getCoderRegistry(), iCoder);
             } catch (CannotProvideCoderException e) {
               throw new IllegalStateException("Could not determine coder for 
accumulator", e);
             }
@@ -295,16 +295,16 @@ public final class TransformTranslator {
             (CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT>)
                 CombineFnUtil.toFnWithContext(transform.getFn());
         final WindowingStrategy<?, ?> windowingStrategy = 
input.getWindowingStrategy();
-        final SparkRuntimeContext runtimeContext = context.getRuntimeContext();
         final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, 
SideInputBroadcast<?>>> sideInputs =
             TranslationUtils.getSideInputs(transform.getSideInputs(), context);
         final SparkKeyedCombineFn<K, InputT, AccumT, OutputT> sparkCombineFn =
-            new SparkKeyedCombineFn<>(combineFn, runtimeContext, sideInputs, 
windowingStrategy);
+            new SparkKeyedCombineFn<>(
+                combineFn, context.getSerializableOptions(), sideInputs, 
windowingStrategy);
         final Coder<AccumT> vaCoder;
         try {
           vaCoder =
               combineFn.getAccumulatorCoder(
-                  runtimeContext.getCoderRegistry(), 
inputCoder.getValueCoder());
+                  context.getPipeline().getCoderRegistry(), 
inputCoder.getValueCoder());
         } catch (CannotProvideCoderException e) {
           throw new IllegalStateException("Could not determine coder for 
accumulator", e);
         }
@@ -360,7 +360,6 @@ public final class TransformTranslator {
             ((BoundedDataset<InputT>) 
context.borrowDataset(transform)).getRDD();
         WindowingStrategy<?, ?> windowingStrategy =
             context.getInput(transform).getWindowingStrategy();
-        Accumulator<NamedAggregators> aggAccum = 
AggregatorsAccumulator.getInstance();
         Accumulator<MetricsContainerStepMap> metricsAccum = 
MetricsAccumulator.getInstance();
 
         JavaPairRDD<TupleTag<?>, WindowedValue<?>> all;
@@ -370,11 +369,10 @@ public final class TransformTranslator {
             || signature.timerDeclarations().size() > 0;
 
         MultiDoFnFunction<InputT, OutputT> multiDoFnFunction = new 
MultiDoFnFunction<>(
-            aggAccum,
             metricsAccum,
             stepName,
             doFn,
-            context.getRuntimeContext(),
+            context.getSerializableOptions(),
             transform.getMainOutputTag(),
             transform.getAdditionalOutputTags().getAll(),
             TranslationUtils.getSideInputs(transform.getSideInputs(), context),
@@ -452,10 +450,11 @@ public final class TransformTranslator {
       public void evaluate(Read.Bounded<T> transform, EvaluationContext 
context) {
         String stepName = context.getCurrentTransform().getFullName();
         final JavaSparkContext jsc = context.getSparkContext();
-        final SparkRuntimeContext runtimeContext = context.getRuntimeContext();
         // create an RDD from a BoundedSource.
-        JavaRDD<WindowedValue<T>> input = new SourceRDD.Bounded<>(
-            jsc.sc(), transform.getSource(), runtimeContext, 
stepName).toJavaRDD();
+        JavaRDD<WindowedValue<T>> input =
+            new SourceRDD.Bounded<>(
+                    jsc.sc(), transform.getSource(), 
context.getSerializableOptions(), stepName)
+                .toJavaRDD();
         // cache to avoid re-evaluation of the source by Spark's lazy DAG 
evaluation.
         context.putDataset(transform, new BoundedDataset<>(input.cache()));
       }

http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
index cd5bb3e..38d6119 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
@@ -32,9 +32,8 @@ import java.util.Map;
 import java.util.Queue;
 import java.util.concurrent.LinkedBlockingQueue;
 import javax.annotation.Nonnull;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
 import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
-import org.apache.beam.runners.spark.aggregators.AggregatorsAccumulator;
-import org.apache.beam.runners.spark.aggregators.NamedAggregators;
 import org.apache.beam.runners.spark.coders.CoderHelpers;
 import org.apache.beam.runners.spark.io.ConsoleIO;
 import org.apache.beam.runners.spark.io.CreateStream;
@@ -50,7 +49,6 @@ import 
org.apache.beam.runners.spark.translation.SparkAssignWindowFn;
 import org.apache.beam.runners.spark.translation.SparkKeyedCombineFn;
 import org.apache.beam.runners.spark.translation.SparkPCollectionView;
 import org.apache.beam.runners.spark.translation.SparkPipelineTranslator;
-import org.apache.beam.runners.spark.translation.SparkRuntimeContext;
 import org.apache.beam.runners.spark.translation.TransformEvaluator;
 import org.apache.beam.runners.spark.translation.TranslationUtils;
 import org.apache.beam.runners.spark.translation.WindowingHelpers;
@@ -125,7 +123,7 @@ public final class StreamingTransformTranslator {
             transform,
             SparkUnboundedSource.read(
                 context.getStreamingContext(),
-                context.getRuntimeContext(),
+                context.getSerializableOptions(),
                 transform.getSource(),
                 stepName));
       }
@@ -273,7 +271,6 @@ public final class StreamingTransformTranslator {
         JavaDStream<WindowedValue<KV<K, V>>> dStream = 
inputDataset.getDStream();
         @SuppressWarnings("unchecked")
         final KvCoder<K, V> coder = (KvCoder<K, V>) 
context.getInput(transform).getCoder();
-        final SparkRuntimeContext runtimeContext = context.getRuntimeContext();
         @SuppressWarnings("unchecked")
         final WindowingStrategy<?, W> windowingStrategy =
             (WindowingStrategy<?, W>) 
context.getInput(transform).getWindowingStrategy();
@@ -303,7 +300,7 @@ public final class StreamingTransformTranslator {
                 coder.getKeyCoder(),
                 wvCoder,
                 windowingStrategy,
-                runtimeContext,
+                context.getSerializableOptions(),
                 streamSources);
 
         context.putDataset(transform, new UnboundedDataset<>(outStream, 
streamSources));
@@ -336,7 +333,7 @@ public final class StreamingTransformTranslator {
             ((UnboundedDataset<KV<K, Iterable<InputT>>>) 
context.borrowDataset(transform));
         JavaDStream<WindowedValue<KV<K, Iterable<InputT>>>> dStream = 
unboundedDataset.getDStream();
 
-        final SparkRuntimeContext runtimeContext = context.getRuntimeContext();
+        final SerializablePipelineOptions options = 
context.getSerializableOptions();
         final SparkPCollectionView pviews = context.getPViews();
 
         JavaDStream<WindowedValue<KV<K, OutputT>>> outStream = 
dStream.transform(
@@ -347,7 +344,7 @@ public final class StreamingTransformTranslator {
                     call(JavaRDD<WindowedValue<KV<K, Iterable<InputT>>>> rdd)
                         throws Exception {
                         SparkKeyedCombineFn<K, InputT, ?, OutputT> 
combineFnWithContext =
-                            new SparkKeyedCombineFn<>(fn, runtimeContext,
+                            new SparkKeyedCombineFn<>(fn, options,
                                 
TranslationUtils.getSideInputs(transform.getSideInputs(),
                                 new JavaSparkContext(rdd.context()), pviews),
                                 windowingStrategy);
@@ -374,7 +371,7 @@ public final class StreamingTransformTranslator {
         final DoFn<InputT, OutputT> doFn = transform.getFn();
         rejectSplittable(doFn);
         rejectStateAndTimers(doFn);
-        final SparkRuntimeContext runtimeContext = context.getRuntimeContext();
+        final SerializablePipelineOptions options = 
context.getSerializableOptions();
         final SparkPCollectionView pviews = context.getPViews();
         final WindowingStrategy<?, ?> windowingStrategy =
             context.getInput(transform).getWindowingStrategy();
@@ -393,8 +390,6 @@ public final class StreamingTransformTranslator {
                   @Override
                   public JavaPairRDD<TupleTag<?>, WindowedValue<?>> call(
                       JavaRDD<WindowedValue<InputT>> rdd) throws Exception {
-                    final Accumulator<NamedAggregators> aggAccum =
-                        AggregatorsAccumulator.getInstance();
                     final Accumulator<MetricsContainerStepMap> metricsAccum =
                         MetricsAccumulator.getInstance();
                     final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, 
SideInputBroadcast<?>>>
@@ -405,11 +400,10 @@ public final class StreamingTransformTranslator {
                             pviews);
                     return rdd.mapPartitionsToPair(
                         new MultiDoFnFunction<>(
-                            aggAccum,
                             metricsAccum,
                             stepName,
                             doFn,
-                            runtimeContext,
+                            options,
                             transform.getMainOutputTag(),
                             transform.getAdditionalOutputTags().getAll(),
                             sideInputs,

http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkRuntimeContextTest.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkRuntimeContextTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkRuntimeContextTest.java
deleted file mode 100644
index 456056a..0000000
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkRuntimeContextTest.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.spark.translation;
-
-import static org.junit.Assert.assertEquals;
-
-import com.fasterxml.jackson.core.JsonGenerator;
-import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.DeserializationContext;
-import com.fasterxml.jackson.databind.JsonDeserializer;
-import com.fasterxml.jackson.databind.JsonSerializer;
-import com.fasterxml.jackson.databind.Module;
-import com.fasterxml.jackson.databind.SerializerProvider;
-import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
-import com.fasterxml.jackson.databind.annotation.JsonSerialize;
-import com.fasterxml.jackson.databind.module.SimpleModule;
-import com.google.auto.service.AutoService;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.testing.CrashingRunner;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for {@link SparkRuntimeContext}.
- */
-@RunWith(JUnit4.class)
-public class SparkRuntimeContextTest {
-  /** PipelineOptions used to test auto registration of Jackson modules. */
-  public interface JacksonIncompatibleOptions extends PipelineOptions {
-    JacksonIncompatible getJacksonIncompatible();
-    void setJacksonIncompatible(JacksonIncompatible value);
-  }
-
-  /** A Jackson {@link Module} to test auto-registration of modules. */
-  @AutoService(Module.class)
-  public static class RegisteredTestModule extends SimpleModule {
-    public RegisteredTestModule() {
-      super("RegisteredTestModule");
-      setMixInAnnotation(JacksonIncompatible.class, 
JacksonIncompatibleMixin.class);
-    }
-  }
-
-  /** A class which Jackson does not know how to serialize/deserialize. */
-  public static class JacksonIncompatible {
-    private final String value;
-    public JacksonIncompatible(String value) {
-      this.value = value;
-    }
-  }
-
-  /** A Jackson mixin used to add annotations to other classes. */
-  @JsonDeserialize(using = JacksonIncompatibleDeserializer.class)
-  @JsonSerialize(using = JacksonIncompatibleSerializer.class)
-  public static final class JacksonIncompatibleMixin {}
-
-  /** A Jackson deserializer for {@link JacksonIncompatible}. */
-  public static class JacksonIncompatibleDeserializer extends
-      JsonDeserializer<JacksonIncompatible> {
-
-    @Override
-    public JacksonIncompatible deserialize(JsonParser jsonParser,
-        DeserializationContext deserializationContext) throws IOException, 
JsonProcessingException {
-      return new JacksonIncompatible(jsonParser.readValueAs(String.class));
-    }
-  }
-
-  /** A Jackson serializer for {@link JacksonIncompatible}. */
-  public static class JacksonIncompatibleSerializer extends 
JsonSerializer<JacksonIncompatible> {
-
-    @Override
-    public void serialize(JacksonIncompatible jacksonIncompatible, 
JsonGenerator jsonGenerator,
-        SerializerProvider serializerProvider) throws IOException, 
JsonProcessingException {
-      jsonGenerator.writeString(jacksonIncompatible.value);
-    }
-  }
-
-  @Test
-  public void testSerializingPipelineOptionsWithCustomUserType() throws 
Exception {
-    PipelineOptions options = 
PipelineOptionsFactory.fromArgs("--jacksonIncompatible=\"testValue\"")
-        .as(JacksonIncompatibleOptions.class);
-    options.setRunner(CrashingRunner.class);
-    Pipeline p = Pipeline.create(options);
-    SparkRuntimeContext context = new SparkRuntimeContext(options);
-
-    ByteArrayOutputStream baos = new ByteArrayOutputStream();
-    try (ObjectOutputStream outputStream = new ObjectOutputStream(baos)) {
-      outputStream.writeObject(context);
-    }
-    try (ObjectInputStream inputStream =
-        new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray()))) {
-      SparkRuntimeContext copy = (SparkRuntimeContext) 
inputStream.readObject();
-      assertEquals("testValue",
-          copy.getPipelineOptions().as(JacksonIncompatibleOptions.class)
-              .getJacksonIncompatible().value);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java
index 9a4d25a..5cc0b3f 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java
@@ -176,7 +176,12 @@ import org.joda.time.format.DateTimeFormatter;
  *
  * <h2>Serialization Of PipelineOptions</h2>
  *
- * {@link PipelineRunner}s require support for options to be serialized. Each 
property
+ * {@link PipelineOptions} is intentionally <i>not</i> marked {@link 
java.io.Serializable}, in order
+ * to discourage pipeline authors from capturing {@link PipelineOptions} at 
pipeline construction
+ * time, because a pipeline may be saved as a template and run with a 
different set of options
+ * than the ones it was constructed with. See {@link 
Pipeline#run(PipelineOptions)}.
+ *
+ * <p>However, {@link PipelineRunner}s require support for options to be 
serialized. Each property
  * within {@link PipelineOptions} must be able to be serialized using Jackson's
  * {@link ObjectMapper} or the getter method for the property annotated with
  * {@link JsonIgnore @JsonIgnore}.

http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProviders.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProviders.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProviders.java
index bc479a2..2fffffa 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProviders.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProviders.java
@@ -19,11 +19,9 @@ package org.apache.beam.sdk.options;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import java.io.IOException;
 import java.util.Map;
-import org.apache.beam.sdk.util.common.ReflectHelpers;
 
 /**
  * Utilities for working with the {@link ValueProvider} interface.
@@ -37,11 +35,9 @@ public class ValueProviders {
    */
   public static String updateSerializedOptions(
       String serializedOptions, Map<String, String> runtimeValues) {
-    ObjectMapper mapper = new ObjectMapper().registerModules(
-        ObjectMapper.findModules(ReflectHelpers.findClassLoader()));
     ObjectNode root, options;
     try {
-      root = mapper.readValue(serializedOptions, ObjectNode.class);
+      root = PipelineOptionsFactory.MAPPER.readValue(serializedOptions, 
ObjectNode.class);
       options = (ObjectNode) root.get("options");
       checkNotNull(options, "Unable to locate 'options' in %s", 
serializedOptions);
     } catch (IOException e) {
@@ -53,7 +49,7 @@ public class ValueProviders {
       options.put(entry.getKey(), entry.getValue());
     }
     try {
-      return mapper.writeValueAsString(root);
+      return PipelineOptionsFactory.MAPPER.writeValueAsString(root);
     } catch (IOException e) {
       throw new RuntimeException("Unable to parse re-serialize options", e);
     }

Reply via email to