Repository: beam Updated Branches: refs/heads/master 0a358c780 -> 339976c9f
http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java index 1385e07..1263618 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java @@ -31,6 +31,7 @@ import org.apache.beam.runners.core.ReduceFnRunner; import org.apache.beam.runners.core.SystemReduceFn; import org.apache.beam.runners.core.TimerInternals; import org.apache.beam.runners.core.UnsupportedSideInputReader; +import org.apache.beam.runners.core.construction.SerializablePipelineOptions; import org.apache.beam.runners.core.construction.TriggerTranslation; import org.apache.beam.runners.core.metrics.CounterCell; import org.apache.beam.runners.core.metrics.MetricsContainerImpl; @@ -38,7 +39,6 @@ import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine; import org.apache.beam.runners.core.triggers.TriggerStateMachines; import org.apache.beam.runners.spark.SparkPipelineOptions; import org.apache.beam.runners.spark.coders.CoderHelpers; -import org.apache.beam.runners.spark.translation.SparkRuntimeContext; import org.apache.beam.runners.spark.translation.TranslationUtils; import org.apache.beam.runners.spark.translation.WindowingHelpers; import org.apache.beam.runners.spark.util.ByteArray; @@ -108,11 +108,11 @@ public class SparkGroupAlsoByWindowViaWindowSet { final Coder<K> keyCoder, final Coder<WindowedValue<InputT>> wvCoder, final WindowingStrategy<?, W> windowingStrategy, - final SparkRuntimeContext runtimeContext, + final SerializablePipelineOptions options, final List<Integer> sourceIds) { final long batchDurationMillis = - runtimeContext.getPipelineOptions().as(SparkPipelineOptions.class).getBatchIntervalMillis(); + options.get().as(SparkPipelineOptions.class).getBatchIntervalMillis(); final IterableCoder<WindowedValue<InputT>> itrWvCoder = IterableCoder.of(wvCoder); final Coder<InputT> iCoder = ((FullWindowedValueCoder<InputT>) wvCoder).getValueCoder(); final Coder<? extends BoundedWindow> wCoder = @@ -123,7 +123,7 @@ public class SparkGroupAlsoByWindowViaWindowSet { TimerInternals.TimerDataCoder.of(windowingStrategy.getWindowFn().windowCoder()); long checkpointDurationMillis = - runtimeContext.getPipelineOptions().as(SparkPipelineOptions.class) + options.get().as(SparkPipelineOptions.class) .getCheckpointDurationMillis(); // we have to switch to Scala API to avoid Optional in the Java API, see: SPARK-4819. @@ -268,7 +268,7 @@ public class SparkGroupAlsoByWindowViaWindowSet { outputHolder, new UnsupportedSideInputReader("GroupAlsoByWindow"), reduceFn, - runtimeContext.getPipelineOptions()); + options.get()); outputHolder.clear(); // clear before potential use. if (!seq.isEmpty()) { http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java index 549bd30..1b54478 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java @@ -27,12 +27,12 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; +import org.apache.beam.runners.core.construction.SerializablePipelineOptions; import org.apache.beam.runners.core.metrics.MetricsContainerStepMap; import org.apache.beam.runners.spark.coders.CoderHelpers; import org.apache.beam.runners.spark.io.EmptyCheckpointMark; import org.apache.beam.runners.spark.io.MicrobatchSource; import org.apache.beam.runners.spark.io.SparkUnboundedSource.Metadata; -import org.apache.beam.runners.spark.translation.SparkRuntimeContext; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.io.Source; import org.apache.beam.sdk.io.UnboundedSource; @@ -91,7 +91,7 @@ public class StateSpecFunctions { * * <p>See also <a href="https://issues.apache.org/jira/browse/SPARK-4819">SPARK-4819</a>.</p> * - * @param runtimeContext A serializable {@link SparkRuntimeContext}. + * @param options A serializable {@link SerializablePipelineOptions}. * @param <T> The type of the input stream elements. * @param <CheckpointMarkT> The type of the {@link UnboundedSource.CheckpointMark}. * @return The appropriate {@link org.apache.spark.streaming.StateSpec} function. @@ -99,7 +99,7 @@ public class StateSpecFunctions { public static <T, CheckpointMarkT extends UnboundedSource.CheckpointMark> scala.Function3<Source<T>, scala.Option<CheckpointMarkT>, State<Tuple2<byte[], Instant>>, Tuple2<Iterable<byte[]>, Metadata>> mapSourceFunction( - final SparkRuntimeContext runtimeContext, final String stepName) { + final SerializablePipelineOptions options, final String stepName) { return new SerializableFunction3<Source<T>, Option<CheckpointMarkT>, State<Tuple2<byte[], Instant>>, Tuple2<Iterable<byte[]>, Metadata>>() { @@ -151,7 +151,7 @@ public class StateSpecFunctions { try { microbatchReader = (MicrobatchSource.Reader) - microbatchSource.getOrCreateReader(runtimeContext.getPipelineOptions(), + microbatchSource.getOrCreateReader(options.get(), checkpointMark); } catch (IOException e) { throw new RuntimeException(e); http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java index 23e430a..463e507 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java @@ -26,6 +26,7 @@ import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.Map; import java.util.Set; +import org.apache.beam.runners.core.construction.SerializablePipelineOptions; import org.apache.beam.runners.core.construction.TransformInputs; import org.apache.beam.runners.spark.SparkPipelineOptions; import org.apache.beam.runners.spark.coders.CoderHelpers; @@ -50,7 +51,6 @@ import org.apache.spark.streaming.api.java.JavaStreamingContext; public class EvaluationContext { private final JavaSparkContext jsc; private JavaStreamingContext jssc; - private final SparkRuntimeContext runtime; private final Pipeline pipeline; private final Map<PValue, Dataset> datasets = new LinkedHashMap<>(); private final Map<PValue, Dataset> pcollections = new LinkedHashMap<>(); @@ -60,12 +60,13 @@ public class EvaluationContext { private final SparkPCollectionView pviews = new SparkPCollectionView(); private final Map<PCollection, Long> cacheCandidates = new HashMap<>(); private final PipelineOptions options; + private final SerializablePipelineOptions serializableOptions; public EvaluationContext(JavaSparkContext jsc, Pipeline pipeline, PipelineOptions options) { this.jsc = jsc; this.pipeline = pipeline; this.options = options; - this.runtime = new SparkRuntimeContext(options); + this.serializableOptions = new SerializablePipelineOptions(options); } public EvaluationContext( @@ -90,8 +91,8 @@ public class EvaluationContext { return options; } - public SparkRuntimeContext getRuntimeContext() { - return runtime; + public SerializablePipelineOptions getSerializableOptions() { + return serializableOptions; } public void setCurrentTransform(AppliedPTransform<?, ?, ?> transform) { @@ -254,7 +255,7 @@ public class EvaluationContext { } private String storageLevel() { - return runtime.getPipelineOptions().as(SparkPipelineOptions.class).getStorageLevel(); + return serializableOptions.get().as(SparkPipelineOptions.class).getStorageLevel(); } } http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java index 23d5b32..7299583 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java @@ -34,8 +34,8 @@ import org.apache.beam.runners.core.InMemoryTimerInternals; import org.apache.beam.runners.core.StateInternals; import org.apache.beam.runners.core.StepContext; import org.apache.beam.runners.core.TimerInternals; +import org.apache.beam.runners.core.construction.SerializablePipelineOptions; import org.apache.beam.runners.core.metrics.MetricsContainerStepMap; -import org.apache.beam.runners.spark.aggregators.NamedAggregators; import org.apache.beam.runners.spark.util.SideInputBroadcast; import org.apache.beam.runners.spark.util.SparkSideInputReader; import org.apache.beam.sdk.transforms.DoFn; @@ -59,11 +59,10 @@ import scala.Tuple2; public class MultiDoFnFunction<InputT, OutputT> implements PairFlatMapFunction<Iterator<WindowedValue<InputT>>, TupleTag<?>, WindowedValue<?>> { - private final Accumulator<NamedAggregators> aggAccum; private final Accumulator<MetricsContainerStepMap> metricsAccum; private final String stepName; private final DoFn<InputT, OutputT> doFn; - private final SparkRuntimeContext runtimeContext; + private final SerializablePipelineOptions options; private final TupleTag<OutputT> mainOutputTag; private final List<TupleTag<?>> additionalOutputTags; private final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs; @@ -71,10 +70,9 @@ public class MultiDoFnFunction<InputT, OutputT> private final boolean stateful; /** - * @param aggAccum The Spark {@link Accumulator} that backs the Beam Aggregators. * @param metricsAccum The Spark {@link Accumulator} that backs the Beam metrics. * @param doFn The {@link DoFn} to be wrapped. - * @param runtimeContext The {@link SparkRuntimeContext}. + * @param options The {@link SerializablePipelineOptions}. * @param mainOutputTag The main output {@link TupleTag}. * @param additionalOutputTags Additional {@link TupleTag output tags}. * @param sideInputs Side inputs used in this {@link DoFn}. @@ -82,21 +80,19 @@ public class MultiDoFnFunction<InputT, OutputT> * @param stateful Stateful {@link DoFn}. */ public MultiDoFnFunction( - Accumulator<NamedAggregators> aggAccum, Accumulator<MetricsContainerStepMap> metricsAccum, String stepName, DoFn<InputT, OutputT> doFn, - SparkRuntimeContext runtimeContext, + SerializablePipelineOptions options, TupleTag<OutputT> mainOutputTag, List<TupleTag<?>> additionalOutputTags, Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs, WindowingStrategy<?, ?> windowingStrategy, boolean stateful) { - this.aggAccum = aggAccum; this.metricsAccum = metricsAccum; this.stepName = stepName; this.doFn = doFn; - this.runtimeContext = runtimeContext; + this.options = options; this.mainOutputTag = mainOutputTag; this.additionalOutputTags = additionalOutputTags; this.sideInputs = sideInputs; @@ -140,7 +136,7 @@ public class MultiDoFnFunction<InputT, OutputT> final DoFnRunner<InputT, OutputT> doFnRunner = DoFnRunners.simpleRunner( - runtimeContext.getPipelineOptions(), + options.get(), doFn, new SparkSideInputReader(sideInputs), outputManager, http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAbstractCombineFn.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAbstractCombineFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAbstractCombineFn.java index 315f7fb..d8d71ff 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAbstractCombineFn.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAbstractCombineFn.java @@ -30,6 +30,7 @@ import java.util.Comparator; import java.util.List; import java.util.Map; import org.apache.beam.runners.core.SideInputReader; +import org.apache.beam.runners.core.construction.SerializablePipelineOptions; import org.apache.beam.runners.spark.util.SideInputBroadcast; import org.apache.beam.runners.spark.util.SparkSideInputReader; import org.apache.beam.sdk.options.PipelineOptions; @@ -48,16 +49,16 @@ import org.apache.beam.sdk.values.WindowingStrategy; * {@link org.apache.beam.sdk.transforms.Combine.CombineFn}. */ public class SparkAbstractCombineFn implements Serializable { - protected final SparkRuntimeContext runtimeContext; + protected final SerializablePipelineOptions options; protected final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs; protected final WindowingStrategy<?, BoundedWindow> windowingStrategy; public SparkAbstractCombineFn( - SparkRuntimeContext runtimeContext, + SerializablePipelineOptions options, Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs, WindowingStrategy<?, ?> windowingStrategy) { - this.runtimeContext = runtimeContext; + this.options = options; this.sideInputs = sideInputs; this.windowingStrategy = (WindowingStrategy<?, BoundedWindow>) windowingStrategy; } @@ -71,7 +72,7 @@ public class SparkAbstractCombineFn implements Serializable { private transient SparkCombineContext combineContext; protected SparkCombineContext ctxtForInput(WindowedValue<?> input) { if (combineContext == null) { - combineContext = new SparkCombineContext(runtimeContext.getPipelineOptions(), + combineContext = new SparkCombineContext(options.get(), new SparkSideInputReader(sideInputs)); } return combineContext.forInput(input); http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGlobalCombineFn.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGlobalCombineFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGlobalCombineFn.java index d0e9038..81416a3 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGlobalCombineFn.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGlobalCombineFn.java @@ -25,6 +25,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import javax.annotation.Nullable; +import org.apache.beam.runners.core.construction.SerializablePipelineOptions; import org.apache.beam.runners.spark.util.SideInputBroadcast; import org.apache.beam.sdk.transforms.CombineWithContext; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; @@ -49,10 +50,10 @@ public class SparkGlobalCombineFn<InputT, AccumT, OutputT> extends SparkAbstract public SparkGlobalCombineFn( CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFn, - SparkRuntimeContext runtimeContext, + SerializablePipelineOptions options, Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs, WindowingStrategy<?, ?> windowingStrategy) { - super(runtimeContext, sideInputs, windowingStrategy); + super(options, sideInputs, windowingStrategy); this.combineFn = combineFn; } http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java index d2a3424..fcf438c 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java @@ -30,6 +30,7 @@ import org.apache.beam.runners.core.StateInternalsFactory; import org.apache.beam.runners.core.SystemReduceFn; import org.apache.beam.runners.core.TimerInternals; import org.apache.beam.runners.core.UnsupportedSideInputReader; +import org.apache.beam.runners.core.construction.SerializablePipelineOptions; import org.apache.beam.runners.core.construction.TriggerTranslation; import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine; import org.apache.beam.runners.core.triggers.TriggerStateMachines; @@ -55,18 +56,18 @@ public class SparkGroupAlsoByWindowViaOutputBufferFn<K, InputT, W extends Bounde private final WindowingStrategy<?, W> windowingStrategy; private final StateInternalsFactory<K> stateInternalsFactory; private final SystemReduceFn<K, InputT, Iterable<InputT>, Iterable<InputT>, W> reduceFn; - private final SparkRuntimeContext runtimeContext; + private final SerializablePipelineOptions options; public SparkGroupAlsoByWindowViaOutputBufferFn( WindowingStrategy<?, W> windowingStrategy, StateInternalsFactory<K> stateInternalsFactory, SystemReduceFn<K, InputT, Iterable<InputT>, Iterable<InputT>, W> reduceFn, - SparkRuntimeContext runtimeContext, + SerializablePipelineOptions options, Accumulator<NamedAggregators> accumulator) { this.windowingStrategy = windowingStrategy; this.stateInternalsFactory = stateInternalsFactory; this.reduceFn = reduceFn; - this.runtimeContext = runtimeContext; + this.options = options; } @Override @@ -98,7 +99,7 @@ public class SparkGroupAlsoByWindowViaOutputBufferFn<K, InputT, W extends Bounde outputter, new UnsupportedSideInputReader("GroupAlsoByWindow"), reduceFn, - runtimeContext.getPipelineOptions()); + options.get()); // Process the grouped values. reduceFnRunner.processElements(values); http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkKeyedCombineFn.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkKeyedCombineFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkKeyedCombineFn.java index 7ac8e7d..55392e9 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkKeyedCombineFn.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkKeyedCombineFn.java @@ -25,6 +25,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import javax.annotation.Nullable; +import org.apache.beam.runners.core.construction.SerializablePipelineOptions; import org.apache.beam.runners.spark.util.SideInputBroadcast; import org.apache.beam.sdk.transforms.CombineWithContext; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; @@ -49,10 +50,10 @@ public class SparkKeyedCombineFn<K, InputT, AccumT, OutputT> extends SparkAbstra public SparkKeyedCombineFn( CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFn, - SparkRuntimeContext runtimeContext, + SerializablePipelineOptions options, Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs, WindowingStrategy<?, ?> windowingStrategy) { - super(runtimeContext, sideInputs, windowingStrategy); + super(options, sideInputs, windowingStrategy); this.combineFn = combineFn; } http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java deleted file mode 100644 index 6361bb2..0000000 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.spark.translation; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.Module; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Function; -import com.google.common.base.Supplier; -import com.google.common.base.Suppliers; -import java.io.IOException; -import java.io.Serializable; -import org.apache.beam.sdk.coders.CoderRegistry; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.util.common.ReflectHelpers; - -/** - * The SparkRuntimeContext allows us to define useful features on the client side before our - * data flow program is launched. - */ -public class SparkRuntimeContext implements Serializable { - private final Supplier<PipelineOptions> optionsSupplier; - private transient CoderRegistry coderRegistry; - - SparkRuntimeContext(PipelineOptions options) { - String serializedPipelineOptions = serializePipelineOptions(options); - this.optionsSupplier = - Suppliers.memoize( - Suppliers.compose( - new DeserializeOptions(), - Suppliers.ofInstance(serializedPipelineOptions))); - } - - /** - * Use an {@link ObjectMapper} configured with any {@link Module}s in the class path allowing - * for user specified configuration injection into the ObjectMapper. This supports user custom - * types on {@link PipelineOptions}. - */ - private static ObjectMapper createMapper() { - return new ObjectMapper().registerModules( - ObjectMapper.findModules(ReflectHelpers.findClassLoader())); - } - - private String serializePipelineOptions(PipelineOptions pipelineOptions) { - try { - return createMapper().writeValueAsString(pipelineOptions); - } catch (JsonProcessingException e) { - throw new IllegalStateException("Failed to serialize the pipeline options.", e); - } - } - - public PipelineOptions getPipelineOptions() { - return optionsSupplier.get(); - } - - public CoderRegistry getCoderRegistry() { - if (coderRegistry == null) { - coderRegistry = CoderRegistry.createDefault(); - } - return coderRegistry; - } - - private static class DeserializeOptions - implements Function<String, PipelineOptions>, Serializable { - @Override - public PipelineOptions apply(String options) { - try { - return createMapper().readValue(options, PipelineOptions.class); - } catch (IOException e) { - throw new IllegalStateException("Failed to deserialize the pipeline options.", e); - } - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java index ac5e0cd..e060e1d 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java @@ -146,7 +146,7 @@ public final class TransformTranslator { windowingStrategy, new TranslationUtils.InMemoryStateInternalsFactory<K>(), SystemReduceFn.<K, V, W>buffering(coder.getValueCoder()), - context.getRuntimeContext(), + context.getSerializableOptions(), accum)); context.putDataset(transform, new BoundedDataset<>(groupedAlsoByWindow)); @@ -171,7 +171,7 @@ public final class TransformTranslator { (CombineWithContext.CombineFnWithContext<InputT, ?, OutputT>) CombineFnUtil.toFnWithContext(transform.getFn()); final SparkKeyedCombineFn<K, InputT, ?, OutputT> sparkCombineFn = - new SparkKeyedCombineFn<>(combineFn, context.getRuntimeContext(), + new SparkKeyedCombineFn<>(combineFn, context.getSerializableOptions(), TranslationUtils.getSideInputs(transform.getSideInputs(), context), context.getInput(transform).getWindowingStrategy()); @@ -222,18 +222,18 @@ public final class TransformTranslator { final WindowedValue.FullWindowedValueCoder<OutputT> wvoCoder = WindowedValue.FullWindowedValueCoder.of(oCoder, windowingStrategy.getWindowFn().windowCoder()); - final SparkRuntimeContext runtimeContext = context.getRuntimeContext(); final boolean hasDefault = transform.isInsertDefault(); final SparkGlobalCombineFn<InputT, AccumT, OutputT> sparkCombineFn = new SparkGlobalCombineFn<>( combineFn, - runtimeContext, + context.getSerializableOptions(), TranslationUtils.getSideInputs(transform.getSideInputs(), context), windowingStrategy); final Coder<AccumT> aCoder; try { - aCoder = combineFn.getAccumulatorCoder(runtimeContext.getCoderRegistry(), iCoder); + aCoder = combineFn.getAccumulatorCoder( + context.getPipeline().getCoderRegistry(), iCoder); } catch (CannotProvideCoderException e) { throw new IllegalStateException("Could not determine coder for accumulator", e); } @@ -295,16 +295,16 @@ public final class TransformTranslator { (CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT>) CombineFnUtil.toFnWithContext(transform.getFn()); final WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy(); - final SparkRuntimeContext runtimeContext = context.getRuntimeContext(); final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs = TranslationUtils.getSideInputs(transform.getSideInputs(), context); final SparkKeyedCombineFn<K, InputT, AccumT, OutputT> sparkCombineFn = - new SparkKeyedCombineFn<>(combineFn, runtimeContext, sideInputs, windowingStrategy); + new SparkKeyedCombineFn<>( + combineFn, context.getSerializableOptions(), sideInputs, windowingStrategy); final Coder<AccumT> vaCoder; try { vaCoder = combineFn.getAccumulatorCoder( - runtimeContext.getCoderRegistry(), inputCoder.getValueCoder()); + context.getPipeline().getCoderRegistry(), inputCoder.getValueCoder()); } catch (CannotProvideCoderException e) { throw new IllegalStateException("Could not determine coder for accumulator", e); } @@ -360,7 +360,6 @@ public final class TransformTranslator { ((BoundedDataset<InputT>) context.borrowDataset(transform)).getRDD(); WindowingStrategy<?, ?> windowingStrategy = context.getInput(transform).getWindowingStrategy(); - Accumulator<NamedAggregators> aggAccum = AggregatorsAccumulator.getInstance(); Accumulator<MetricsContainerStepMap> metricsAccum = MetricsAccumulator.getInstance(); JavaPairRDD<TupleTag<?>, WindowedValue<?>> all; @@ -370,11 +369,10 @@ public final class TransformTranslator { || signature.timerDeclarations().size() > 0; MultiDoFnFunction<InputT, OutputT> multiDoFnFunction = new MultiDoFnFunction<>( - aggAccum, metricsAccum, stepName, doFn, - context.getRuntimeContext(), + context.getSerializableOptions(), transform.getMainOutputTag(), transform.getAdditionalOutputTags().getAll(), TranslationUtils.getSideInputs(transform.getSideInputs(), context), @@ -452,10 +450,11 @@ public final class TransformTranslator { public void evaluate(Read.Bounded<T> transform, EvaluationContext context) { String stepName = context.getCurrentTransform().getFullName(); final JavaSparkContext jsc = context.getSparkContext(); - final SparkRuntimeContext runtimeContext = context.getRuntimeContext(); // create an RDD from a BoundedSource. - JavaRDD<WindowedValue<T>> input = new SourceRDD.Bounded<>( - jsc.sc(), transform.getSource(), runtimeContext, stepName).toJavaRDD(); + JavaRDD<WindowedValue<T>> input = + new SourceRDD.Bounded<>( + jsc.sc(), transform.getSource(), context.getSerializableOptions(), stepName) + .toJavaRDD(); // cache to avoid re-evaluation of the source by Spark's lazy DAG evaluation. context.putDataset(transform, new BoundedDataset<>(input.cache())); } http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java index cd5bb3e..38d6119 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java @@ -32,9 +32,8 @@ import java.util.Map; import java.util.Queue; import java.util.concurrent.LinkedBlockingQueue; import javax.annotation.Nonnull; +import org.apache.beam.runners.core.construction.SerializablePipelineOptions; import org.apache.beam.runners.core.metrics.MetricsContainerStepMap; -import org.apache.beam.runners.spark.aggregators.AggregatorsAccumulator; -import org.apache.beam.runners.spark.aggregators.NamedAggregators; import org.apache.beam.runners.spark.coders.CoderHelpers; import org.apache.beam.runners.spark.io.ConsoleIO; import org.apache.beam.runners.spark.io.CreateStream; @@ -50,7 +49,6 @@ import org.apache.beam.runners.spark.translation.SparkAssignWindowFn; import org.apache.beam.runners.spark.translation.SparkKeyedCombineFn; import org.apache.beam.runners.spark.translation.SparkPCollectionView; import org.apache.beam.runners.spark.translation.SparkPipelineTranslator; -import org.apache.beam.runners.spark.translation.SparkRuntimeContext; import org.apache.beam.runners.spark.translation.TransformEvaluator; import org.apache.beam.runners.spark.translation.TranslationUtils; import org.apache.beam.runners.spark.translation.WindowingHelpers; @@ -125,7 +123,7 @@ public final class StreamingTransformTranslator { transform, SparkUnboundedSource.read( context.getStreamingContext(), - context.getRuntimeContext(), + context.getSerializableOptions(), transform.getSource(), stepName)); } @@ -273,7 +271,6 @@ public final class StreamingTransformTranslator { JavaDStream<WindowedValue<KV<K, V>>> dStream = inputDataset.getDStream(); @SuppressWarnings("unchecked") final KvCoder<K, V> coder = (KvCoder<K, V>) context.getInput(transform).getCoder(); - final SparkRuntimeContext runtimeContext = context.getRuntimeContext(); @SuppressWarnings("unchecked") final WindowingStrategy<?, W> windowingStrategy = (WindowingStrategy<?, W>) context.getInput(transform).getWindowingStrategy(); @@ -303,7 +300,7 @@ public final class StreamingTransformTranslator { coder.getKeyCoder(), wvCoder, windowingStrategy, - runtimeContext, + context.getSerializableOptions(), streamSources); context.putDataset(transform, new UnboundedDataset<>(outStream, streamSources)); @@ -336,7 +333,7 @@ public final class StreamingTransformTranslator { ((UnboundedDataset<KV<K, Iterable<InputT>>>) context.borrowDataset(transform)); JavaDStream<WindowedValue<KV<K, Iterable<InputT>>>> dStream = unboundedDataset.getDStream(); - final SparkRuntimeContext runtimeContext = context.getRuntimeContext(); + final SerializablePipelineOptions options = context.getSerializableOptions(); final SparkPCollectionView pviews = context.getPViews(); JavaDStream<WindowedValue<KV<K, OutputT>>> outStream = dStream.transform( @@ -347,7 +344,7 @@ public final class StreamingTransformTranslator { call(JavaRDD<WindowedValue<KV<K, Iterable<InputT>>>> rdd) throws Exception { SparkKeyedCombineFn<K, InputT, ?, OutputT> combineFnWithContext = - new SparkKeyedCombineFn<>(fn, runtimeContext, + new SparkKeyedCombineFn<>(fn, options, TranslationUtils.getSideInputs(transform.getSideInputs(), new JavaSparkContext(rdd.context()), pviews), windowingStrategy); @@ -374,7 +371,7 @@ public final class StreamingTransformTranslator { final DoFn<InputT, OutputT> doFn = transform.getFn(); rejectSplittable(doFn); rejectStateAndTimers(doFn); - final SparkRuntimeContext runtimeContext = context.getRuntimeContext(); + final SerializablePipelineOptions options = context.getSerializableOptions(); final SparkPCollectionView pviews = context.getPViews(); final WindowingStrategy<?, ?> windowingStrategy = context.getInput(transform).getWindowingStrategy(); @@ -393,8 +390,6 @@ public final class StreamingTransformTranslator { @Override public JavaPairRDD<TupleTag<?>, WindowedValue<?>> call( JavaRDD<WindowedValue<InputT>> rdd) throws Exception { - final Accumulator<NamedAggregators> aggAccum = - AggregatorsAccumulator.getInstance(); final Accumulator<MetricsContainerStepMap> metricsAccum = MetricsAccumulator.getInstance(); final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> @@ -405,11 +400,10 @@ public final class StreamingTransformTranslator { pviews); return rdd.mapPartitionsToPair( new MultiDoFnFunction<>( - aggAccum, metricsAccum, stepName, doFn, - runtimeContext, + options, transform.getMainOutputTag(), transform.getAdditionalOutputTags().getAll(), sideInputs, http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkRuntimeContextTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkRuntimeContextTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkRuntimeContextTest.java deleted file mode 100644 index 456056a..0000000 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkRuntimeContextTest.java +++ /dev/null @@ -1,122 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.spark.translation; - -import static org.junit.Assert.assertEquals; - -import com.fasterxml.jackson.core.JsonGenerator; -import com.fasterxml.jackson.core.JsonParser; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.DeserializationContext; -import com.fasterxml.jackson.databind.JsonDeserializer; -import com.fasterxml.jackson.databind.JsonSerializer; -import com.fasterxml.jackson.databind.Module; -import com.fasterxml.jackson.databind.SerializerProvider; -import com.fasterxml.jackson.databind.annotation.JsonDeserialize; -import com.fasterxml.jackson.databind.annotation.JsonSerialize; -import com.fasterxml.jackson.databind.module.SimpleModule; -import com.google.auto.service.AutoService; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.testing.CrashingRunner; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** - * Tests for {@link SparkRuntimeContext}. - */ -@RunWith(JUnit4.class) -public class SparkRuntimeContextTest { - /** PipelineOptions used to test auto registration of Jackson modules. */ - public interface JacksonIncompatibleOptions extends PipelineOptions { - JacksonIncompatible getJacksonIncompatible(); - void setJacksonIncompatible(JacksonIncompatible value); - } - - /** A Jackson {@link Module} to test auto-registration of modules. */ - @AutoService(Module.class) - public static class RegisteredTestModule extends SimpleModule { - public RegisteredTestModule() { - super("RegisteredTestModule"); - setMixInAnnotation(JacksonIncompatible.class, JacksonIncompatibleMixin.class); - } - } - - /** A class which Jackson does not know how to serialize/deserialize. */ - public static class JacksonIncompatible { - private final String value; - public JacksonIncompatible(String value) { - this.value = value; - } - } - - /** A Jackson mixin used to add annotations to other classes. */ - @JsonDeserialize(using = JacksonIncompatibleDeserializer.class) - @JsonSerialize(using = JacksonIncompatibleSerializer.class) - public static final class JacksonIncompatibleMixin {} - - /** A Jackson deserializer for {@link JacksonIncompatible}. */ - public static class JacksonIncompatibleDeserializer extends - JsonDeserializer<JacksonIncompatible> { - - @Override - public JacksonIncompatible deserialize(JsonParser jsonParser, - DeserializationContext deserializationContext) throws IOException, JsonProcessingException { - return new JacksonIncompatible(jsonParser.readValueAs(String.class)); - } - } - - /** A Jackson serializer for {@link JacksonIncompatible}. */ - public static class JacksonIncompatibleSerializer extends JsonSerializer<JacksonIncompatible> { - - @Override - public void serialize(JacksonIncompatible jacksonIncompatible, JsonGenerator jsonGenerator, - SerializerProvider serializerProvider) throws IOException, JsonProcessingException { - jsonGenerator.writeString(jacksonIncompatible.value); - } - } - - @Test - public void testSerializingPipelineOptionsWithCustomUserType() throws Exception { - PipelineOptions options = PipelineOptionsFactory.fromArgs("--jacksonIncompatible=\"testValue\"") - .as(JacksonIncompatibleOptions.class); - options.setRunner(CrashingRunner.class); - Pipeline p = Pipeline.create(options); - SparkRuntimeContext context = new SparkRuntimeContext(options); - - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - try (ObjectOutputStream outputStream = new ObjectOutputStream(baos)) { - outputStream.writeObject(context); - } - try (ObjectInputStream inputStream = - new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray()))) { - SparkRuntimeContext copy = (SparkRuntimeContext) inputStream.readObject(); - assertEquals("testValue", - copy.getPipelineOptions().as(JacksonIncompatibleOptions.class) - .getJacksonIncompatible().value); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java index 9a4d25a..5cc0b3f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java @@ -176,7 +176,12 @@ import org.joda.time.format.DateTimeFormatter; * * <h2>Serialization Of PipelineOptions</h2> * - * {@link PipelineRunner}s require support for options to be serialized. Each property + * {@link PipelineOptions} is intentionally <i>not</i> marked {@link java.io.Serializable}, in order + * to discourage pipeline authors from capturing {@link PipelineOptions} at pipeline construction + * time, because a pipeline may be saved as a template and run with a different set of options + * than the ones it was constructed with. See {@link Pipeline#run(PipelineOptions)}. + * + * <p>However, {@link PipelineRunner}s require support for options to be serialized. Each property * within {@link PipelineOptions} must be able to be serialized using Jackson's * {@link ObjectMapper} or the getter method for the property annotated with * {@link JsonIgnore @JsonIgnore}. http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProviders.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProviders.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProviders.java index bc479a2..2fffffa 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProviders.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProviders.java @@ -19,11 +19,9 @@ package org.apache.beam.sdk.options; import static com.google.common.base.Preconditions.checkNotNull; -import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; import java.io.IOException; import java.util.Map; -import org.apache.beam.sdk.util.common.ReflectHelpers; /** * Utilities for working with the {@link ValueProvider} interface. @@ -37,11 +35,9 @@ public class ValueProviders { */ public static String updateSerializedOptions( String serializedOptions, Map<String, String> runtimeValues) { - ObjectMapper mapper = new ObjectMapper().registerModules( - ObjectMapper.findModules(ReflectHelpers.findClassLoader())); ObjectNode root, options; try { - root = mapper.readValue(serializedOptions, ObjectNode.class); + root = PipelineOptionsFactory.MAPPER.readValue(serializedOptions, ObjectNode.class); options = (ObjectNode) root.get("options"); checkNotNull(options, "Unable to locate 'options' in %s", serializedOptions); } catch (IOException e) { @@ -53,7 +49,7 @@ public class ValueProviders { options.put(entry.getKey(), entry.getValue()); } try { - return mapper.writeValueAsString(root); + return PipelineOptionsFactory.MAPPER.writeValueAsString(root); } catch (IOException e) { throw new RuntimeException("Unable to parse re-serialize options", e); }