Create broadcast lazily Fix Amit's comments + rename BroadcastHelper to SideInputBroadcast
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/662934b1 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/662934b1 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/662934b1 Branch: refs/heads/master Commit: 662934b1e88c9697585b05e29d9e7b4a34fc6943 Parents: 130c113 Author: ksalant <ksal...@payapal.com> Authored: Thu Jan 5 09:40:50 2017 +0200 Committer: Sela <ans...@paypal.com> Committed: Tue Jan 10 19:12:25 2017 +0200 ---------------------------------------------------------------------- .../runners/spark/translation/DoFnFunction.java | 6 +- .../spark/translation/EvaluationContext.java | 11 ++- .../translation/GroupCombineFunctions.java | 8 +- .../spark/translation/MultiDoFnFunction.java | 7 +- .../translation/SparkAbstractCombineFn.java | 12 +-- .../spark/translation/SparkGlobalCombineFn.java | 13 ++-- .../spark/translation/SparkKeyedCombineFn.java | 13 ++-- .../spark/translation/SparkPCollectionView.java | 42 +++++----- .../spark/translation/TransformTranslator.java | 20 ++--- .../spark/translation/TranslationUtils.java | 25 +++--- .../streaming/StreamingTransformTranslator.java | 20 ++--- .../runners/spark/util/BroadcastHelper.java | 82 -------------------- .../runners/spark/util/SideInputBroadcast.java | 77 ++++++++++++++++++ .../spark/util/SparkSideInputReader.java | 9 +-- .../ResumeFromCheckpointStreamingTest.java | 3 +- .../src/main/resources/beam/findbugs-filter.xml | 26 ------- 16 files changed, 166 insertions(+), 208 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/662934b1/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java index 6a641b5..af8e089 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java @@ -27,7 +27,7 @@ import org.apache.beam.runners.core.DoFnRunner; import org.apache.beam.runners.core.DoFnRunners; import org.apache.beam.runners.spark.aggregators.NamedAggregators; import org.apache.beam.runners.spark.aggregators.SparkAggregators; -import org.apache.beam.runners.spark.util.BroadcastHelper; +import org.apache.beam.runners.spark.util.SideInputBroadcast; import org.apache.beam.runners.spark.util.SparkSideInputReader; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.util.WindowedValue; @@ -50,7 +50,7 @@ public class DoFnFunction<InputT, OutputT> private final Accumulator<NamedAggregators> accumulator; private final DoFn<InputT, OutputT> doFn; private final SparkRuntimeContext runtimeContext; - private final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> sideInputs; + private final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs; private final WindowingStrategy<?, ?> windowingStrategy; /** @@ -64,7 +64,7 @@ public class DoFnFunction<InputT, OutputT> Accumulator<NamedAggregators> accumulator, DoFn<InputT, OutputT> doFn, SparkRuntimeContext runtimeContext, - Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> sideInputs, + Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs, WindowingStrategy<?, ?> windowingStrategy) { this.accumulator = accumulator; http://git-wip-us.apache.org/repos/asf/beam/blob/662934b1/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java index b1a1142..0ad862d 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java @@ -192,23 +192,26 @@ public class EvaluationContext { } /** - * Retruns the current views creates in the pipepline. + * Retrun the current views creates in the pipepline. + * * @return SparkPCollectionView */ - public SparkPCollectionView getPviews() { + public SparkPCollectionView getPViews() { return pviews; } /** * Adds/Replaces a view to the current views creates in the pipepline. + * * @param view - Identifier of the view * @param value - Actual value of the view * @param coder - Coder of the value */ - public void putPView(PCollectionView<?> view, + public void putPView( + PCollectionView<?> view, Iterable<WindowedValue<?>> value, Coder<Iterable<WindowedValue<?>>> coder) { - pviews.putPView(view, value, coder, jsc); + pviews.putPView(view, value, coder); } <T> Iterable<WindowedValue<T>> getWindowedValues(PCollection<T> pcollection) { http://git-wip-us.apache.org/repos/asf/beam/blob/662934b1/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java index 4875b0c..bb95065 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java @@ -24,8 +24,8 @@ import java.util.Map; import org.apache.beam.runners.core.SystemReduceFn; import org.apache.beam.runners.spark.aggregators.NamedAggregators; import org.apache.beam.runners.spark.coders.CoderHelpers; -import org.apache.beam.runners.spark.util.BroadcastHelper; import org.apache.beam.runners.spark.util.ByteArray; +import org.apache.beam.runners.spark.util.SideInputBroadcast; import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.IterableCoder; @@ -102,7 +102,7 @@ public class GroupCombineFunctions { final Coder<OutputT> oCoder, final SparkRuntimeContext runtimeContext, final WindowingStrategy<?, ?> windowingStrategy, - final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> + final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs, boolean hasDefault) { // handle empty input RDD, which will natively skip the entire execution as Spark will not @@ -190,8 +190,8 @@ public class GroupCombineFunctions { final KvCoder<K, InputT> inputCoder, final SparkRuntimeContext runtimeContext, final WindowingStrategy<?, ?> windowingStrategy, - final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, - BroadcastHelper<?>>> sideInputs) { + final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> + sideInputs) { //--- coders. final Coder<K> keyCoder = inputCoder.getKeyCoder(); final Coder<InputT> viCoder = inputCoder.getValueCoder(); http://git-wip-us.apache.org/repos/asf/beam/blob/662934b1/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java index 8a55369..0f9417a 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java @@ -29,7 +29,7 @@ import org.apache.beam.runners.core.DoFnRunner; import org.apache.beam.runners.core.DoFnRunners; import org.apache.beam.runners.spark.aggregators.NamedAggregators; import org.apache.beam.runners.spark.aggregators.SparkAggregators; -import org.apache.beam.runners.spark.util.BroadcastHelper; +import org.apache.beam.runners.spark.util.SideInputBroadcast; import org.apache.beam.runners.spark.util.SparkSideInputReader; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.util.WindowedValue; @@ -56,7 +56,7 @@ public class MultiDoFnFunction<InputT, OutputT> private final DoFn<InputT, OutputT> doFn; private final SparkRuntimeContext runtimeContext; private final TupleTag<OutputT> mainOutputTag; - private final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> sideInputs; + private final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs; private final WindowingStrategy<?, ?> windowingStrategy; /** @@ -72,8 +72,7 @@ public class MultiDoFnFunction<InputT, OutputT> DoFn<InputT, OutputT> doFn, SparkRuntimeContext runtimeContext, TupleTag<OutputT> mainOutputTag, - Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, - BroadcastHelper<?>>> sideInputs, + Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs, WindowingStrategy<?, ?> windowingStrategy) { this.accumulator = accumulator; http://git-wip-us.apache.org/repos/asf/beam/blob/662934b1/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAbstractCombineFn.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAbstractCombineFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAbstractCombineFn.java index 6aeb0db..fa1c3fc 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAbstractCombineFn.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAbstractCombineFn.java @@ -29,7 +29,7 @@ import java.util.Collections; import java.util.Comparator; import java.util.List; import java.util.Map; -import org.apache.beam.runners.spark.util.BroadcastHelper; +import org.apache.beam.runners.spark.util.SideInputBroadcast; import org.apache.beam.runners.spark.util.SparkSideInputReader; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.CombineWithContext; @@ -49,14 +49,14 @@ import org.apache.beam.sdk.values.TupleTag; */ public class SparkAbstractCombineFn implements Serializable { protected final SparkRuntimeContext runtimeContext; - protected final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> sideInputs; + protected final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs; protected final WindowingStrategy<?, ?> windowingStrategy; - public SparkAbstractCombineFn(SparkRuntimeContext runtimeContext, - Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> - sideInputs, - WindowingStrategy<?, ?> windowingStrategy) { + public SparkAbstractCombineFn( + SparkRuntimeContext runtimeContext, + Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs, + WindowingStrategy<?, ?> windowingStrategy) { this.runtimeContext = runtimeContext; this.sideInputs = sideInputs; this.windowingStrategy = windowingStrategy; http://git-wip-us.apache.org/repos/asf/beam/blob/662934b1/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGlobalCombineFn.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGlobalCombineFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGlobalCombineFn.java index 5339fb3..23f5d20 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGlobalCombineFn.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGlobalCombineFn.java @@ -25,7 +25,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import javax.annotation.Nullable; -import org.apache.beam.runners.spark.util.BroadcastHelper; +import org.apache.beam.runners.spark.util.SideInputBroadcast; import org.apache.beam.sdk.transforms.CombineWithContext; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; @@ -46,12 +46,11 @@ import org.joda.time.Instant; public class SparkGlobalCombineFn<InputT, AccumT, OutputT> extends SparkAbstractCombineFn { private final CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFn; - public SparkGlobalCombineFn(CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> - combineFn, - SparkRuntimeContext runtimeContext, - Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> - sideInputs, - WindowingStrategy<?, ?> windowingStrategy) { + public SparkGlobalCombineFn( + CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFn, + SparkRuntimeContext runtimeContext, + Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs, + WindowingStrategy<?, ?> windowingStrategy) { super(runtimeContext, sideInputs, windowingStrategy); this.combineFn = combineFn; } http://git-wip-us.apache.org/repos/asf/beam/blob/662934b1/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkKeyedCombineFn.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkKeyedCombineFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkKeyedCombineFn.java index 910f7f0..b5d243f 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkKeyedCombineFn.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkKeyedCombineFn.java @@ -25,7 +25,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import javax.annotation.Nullable; -import org.apache.beam.runners.spark.util.BroadcastHelper; +import org.apache.beam.runners.spark.util.SideInputBroadcast; import org.apache.beam.sdk.transforms.CombineWithContext; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; @@ -46,12 +46,11 @@ import org.joda.time.Instant; public class SparkKeyedCombineFn<K, InputT, AccumT, OutputT> extends SparkAbstractCombineFn { private final CombineWithContext.KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn; - public SparkKeyedCombineFn(CombineWithContext.KeyedCombineFnWithContext<K, InputT, AccumT, - OutputT> combineFn, - SparkRuntimeContext runtimeContext, - Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> - sideInputs, - WindowingStrategy<?, ?> windowingStrategy) { + public SparkKeyedCombineFn( + CombineWithContext.KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn, + SparkRuntimeContext runtimeContext, + Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs, + WindowingStrategy<?, ?> windowingStrategy) { super(runtimeContext, sideInputs, windowingStrategy); this.combineFn = combineFn; } http://git-wip-us.apache.org/repos/asf/beam/blob/662934b1/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPCollectionView.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPCollectionView.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPCollectionView.java index e888182..f71cb6b 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPCollectionView.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPCollectionView.java @@ -21,7 +21,7 @@ import java.io.Serializable; import java.util.LinkedHashMap; import java.util.Map; import org.apache.beam.runners.spark.coders.CoderHelpers; -import org.apache.beam.runners.spark.util.BroadcastHelper; +import org.apache.beam.runners.spark.util.SideInputBroadcast; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PCollectionView; @@ -34,7 +34,8 @@ import scala.Tuple2; public class SparkPCollectionView implements Serializable { // Holds the view --> broadcast mapping. Transient so it will be null from resume - private transient volatile Map<PCollectionView<?>, BroadcastHelper> broadcastHelperMap = null; + private transient volatile Map<PCollectionView<?>, SideInputBroadcast> + broadcastHelperMap = null; // Holds the Actual data of the views in serialize form private Map<PCollectionView<?>, @@ -45,24 +46,25 @@ public class SparkPCollectionView implements Serializable { void putPView( PCollectionView<?> view, Iterable<WindowedValue<?>> value, - Coder<Iterable<WindowedValue<?>>> coder, - JavaSparkContext context) { + Coder<Iterable<WindowedValue<?>>> coder) { pviews.put(view, new Tuple2<>(CoderHelpers.toByteArray(value, coder), coder)); - // overwrite/create broadcast - Future improvement is to initialize the BH lazily - getPCollectionView(view, context, true); - } - BroadcastHelper getPCollectionView( - PCollectionView<?> view, - JavaSparkContext context) { - return getPCollectionView(view, context, false); + // Currently unsynchronized unpersist, if needed can be changed to blocking + if (broadcastHelperMap != null) { + synchronized (SparkPCollectionView.class) { + SideInputBroadcast helper = broadcastHelperMap.get(view); + if (helper != null) { + helper.unpersist(); + broadcastHelperMap.remove(view); + } + } + } } - private BroadcastHelper getPCollectionView( + SideInputBroadcast getPCollectionView( PCollectionView<?> view, - JavaSparkContext context, - boolean overwrite) { + JavaSparkContext context) { // initialize broadcastHelperMap if needed if (broadcastHelperMap == null) { synchronized (SparkPCollectionView.class) { @@ -73,7 +75,7 @@ public class SparkPCollectionView implements Serializable { } //lazily broadcast views - BroadcastHelper helper = broadcastHelperMap.get(view); + SideInputBroadcast helper = broadcastHelperMap.get(view); if (helper == null) { synchronized (SparkPCollectionView.class) { helper = broadcastHelperMap.get(view); @@ -81,21 +83,15 @@ public class SparkPCollectionView implements Serializable { helper = createBroadcastHelper(view, context); } } - } else if (overwrite) { - synchronized (SparkPCollectionView.class) { - // Currently unsynchronized unpersist, if needed can be changed to blocking - helper.unpersist(); - helper = createBroadcastHelper(view, context); - } } return helper; } - private BroadcastHelper createBroadcastHelper( + private SideInputBroadcast createBroadcastHelper( PCollectionView<?> view, JavaSparkContext context) { Tuple2<byte[], Coder<Iterable<WindowedValue<?>>>> tuple2 = pviews.get(view); - BroadcastHelper helper = BroadcastHelper.create(tuple2._1, tuple2._2); + SideInputBroadcast helper = SideInputBroadcast.create(tuple2._1, tuple2._2); helper.broadcast(context); broadcastHelperMap.put(view, helper); return helper; http://git-wip-us.apache.org/repos/asf/beam/blob/662934b1/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java index 0cf3dc6..3e941e4 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java @@ -40,7 +40,7 @@ import org.apache.beam.runners.spark.io.hadoop.HadoopIO; import org.apache.beam.runners.spark.io.hadoop.ShardNameTemplateHelper; import org.apache.beam.runners.spark.io.hadoop.TemplatedAvroKeyOutputFormat; import org.apache.beam.runners.spark.io.hadoop.TemplatedTextOutputFormat; -import org.apache.beam.runners.spark.util.BroadcastHelper; +import org.apache.beam.runners.spark.util.SideInputBroadcast; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; @@ -178,7 +178,7 @@ public final class TransformTranslator { CombineFnUtil.toFnWithContext(transform.getFn()); final WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy(); final SparkRuntimeContext runtimeContext = context.getRuntimeContext(); - final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> sideInputs = + final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs = TranslationUtils.getSideInputs(transform.getSideInputs(), context); final boolean hasDefault = transform.isInsertDefault(); @@ -210,7 +210,7 @@ public final class TransformTranslator { CombineFnUtil.toFnWithContext(transform.getFn()); final WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy(); final SparkRuntimeContext runtimeContext = context.getRuntimeContext(); - final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> sideInputs = + final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs = TranslationUtils.getSideInputs(transform.getSideInputs(), context); @SuppressWarnings("unchecked") @@ -237,7 +237,7 @@ public final class TransformTranslator { context.getInput(transform).getWindowingStrategy(); Accumulator<NamedAggregators> accum = SparkAggregators.getNamedAggregators(context.getSparkContext()); - Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> sideInputs = + Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs = TranslationUtils.getSideInputs(transform.getSideInputs(), context); context.putDataset(transform, new BoundedDataset<>(inRDD.mapPartitions(new DoFnFunction<>(accum, doFn, @@ -536,9 +536,7 @@ public final class TransformTranslator { @SuppressWarnings("unchecked") Iterable<WindowedValue<?>> iterCast = (Iterable<WindowedValue<?>>) iter; - context.putPView(output, - iterCast, - coderInternal); + context.putPView(output, iterCast, coderInternal); } }; } @@ -555,9 +553,7 @@ public final class TransformTranslator { @SuppressWarnings("unchecked") Iterable<WindowedValue<?>> iterCast = (Iterable<WindowedValue<?>>) iter; - context.putPView(output, - iterCast, - coderInternal); + context.putPView(output, iterCast, coderInternal); } }; } @@ -576,9 +572,7 @@ public final class TransformTranslator { @SuppressWarnings("unchecked") Iterable<WindowedValue<?>> iterCast = (Iterable<WindowedValue<?>>) iter; - context.putPView(output, - iterCast, - coderInternal); + context.putPView(output, iterCast, coderInternal); } }; } http://git-wip-us.apache.org/repos/asf/beam/blob/662934b1/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java index ae9cb3e..965330c 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java @@ -25,7 +25,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import org.apache.beam.runners.spark.SparkRunner; -import org.apache.beam.runners.spark.util.BroadcastHelper; +import org.apache.beam.runners.spark.util.SideInputBroadcast; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.reflect.DoFnSignature; import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; @@ -189,11 +189,11 @@ public final class TranslationUtils { * * @param views The {@link PCollectionView}s. * @param context The {@link EvaluationContext}. - * @return a map of tagged {@link BroadcastHelper}s and their {@link WindowingStrategy}. + * @return a map of tagged {@link SideInputBroadcast}s and their {@link WindowingStrategy}. */ - static Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> + static Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> getSideInputs(List<PCollectionView<?>> views, EvaluationContext context) { - return getSideInputs(views, context.getSparkContext(), context.getPviews()); + return getSideInputs(views, context.getSparkContext(), context.getPViews()); } /** @@ -202,22 +202,23 @@ public final class TranslationUtils { * @param views The {@link PCollectionView}s. * @param context The {@link JavaSparkContext}. * @param pviews The {@link SparkPCollectionView}. - * @return a map of tagged {@link BroadcastHelper}s and their {@link WindowingStrategy}. + * @return a map of tagged {@link SideInputBroadcast}s and their {@link WindowingStrategy}. */ - public static Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> - getSideInputs(List<PCollectionView<?>> views, JavaSparkContext context, - SparkPCollectionView pviews) { - + public static Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> + getSideInputs( + List<PCollectionView<?>> views, + JavaSparkContext context, + SparkPCollectionView pviews) { if (views == null) { return ImmutableMap.of(); } else { - Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> sideInputs = + Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs = Maps.newHashMap(); for (PCollectionView<?> view : views) { - BroadcastHelper helper = pviews.getPCollectionView(view, context); + SideInputBroadcast helper = pviews.getPCollectionView(view, context); WindowingStrategy<?, ?> windowingStrategy = view.getWindowingStrategyInternal(); sideInputs.put(view.getTagInternal(), - KV.<WindowingStrategy<?, ?>, BroadcastHelper<?>>of(windowingStrategy, helper)); + KV.<WindowingStrategy<?, ?>, SideInputBroadcast<?>>of(windowingStrategy, helper)); } return sideInputs; } http://git-wip-us.apache.org/repos/asf/beam/blob/662934b1/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java index 0b2b4d6..3c89b99 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java @@ -43,7 +43,7 @@ import org.apache.beam.runners.spark.translation.SparkRuntimeContext; import org.apache.beam.runners.spark.translation.TransformEvaluator; import org.apache.beam.runners.spark.translation.TranslationUtils; import org.apache.beam.runners.spark.translation.WindowingHelpers; -import org.apache.beam.runners.spark.util.BroadcastHelper; +import org.apache.beam.runners.spark.util.SideInputBroadcast; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.io.Read; @@ -254,7 +254,7 @@ final class StreamingTransformTranslator { .getDStream(); final SparkRuntimeContext runtimeContext = context.getRuntimeContext(); - final SparkPCollectionView pviews = context.getPviews(); + final SparkPCollectionView pviews = context.getPViews(); JavaDStream<WindowedValue<KV<K, OutputT>>> outStream = dStream.transform( new Function<JavaRDD<WindowedValue<KV<K, Iterable<InputT>>>>, @@ -296,7 +296,7 @@ final class StreamingTransformTranslator { final WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy(); final SparkRuntimeContext runtimeContext = context.getRuntimeContext(); final boolean hasDefault = transform.isInsertDefault(); - final SparkPCollectionView pviews = context.getPviews(); + final SparkPCollectionView pviews = context.getPViews(); JavaDStream<WindowedValue<InputT>> dStream = ((UnboundedDataset<InputT>) context.borrowDataset(transform)).getDStream(); @@ -306,7 +306,7 @@ final class StreamingTransformTranslator { @Override public JavaRDD<WindowedValue<OutputT>> call(JavaRDD<WindowedValue<InputT>> rdd) throws Exception { - final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> sideInputs = + final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs = TranslationUtils.getSideInputs(transform.getSideInputs(), JavaSparkContext.fromSparkContext(rdd.context()), pviews); @@ -336,7 +336,7 @@ final class StreamingTransformTranslator { CombineFnUtil.toFnWithContext(transform.getFn()); final WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy(); final SparkRuntimeContext runtimeContext = context.getRuntimeContext(); - final SparkPCollectionView pviews = context.getPviews(); + final SparkPCollectionView pviews = context.getPViews(); JavaDStream<WindowedValue<KV<K, InputT>>> dStream = ((UnboundedDataset<KV<K, InputT>>) context.borrowDataset(transform)).getDStream(); @@ -347,7 +347,7 @@ final class StreamingTransformTranslator { @Override public JavaRDD<WindowedValue<KV<K, OutputT>>> call( JavaRDD<WindowedValue<KV<K, InputT>>> rdd) throws Exception { - final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> sideInputs = + final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs = TranslationUtils.getSideInputs(transform.getSideInputs(), JavaSparkContext.fromSparkContext(rdd.context()), pviews); @@ -371,7 +371,7 @@ final class StreamingTransformTranslator { final SparkRuntimeContext runtimeContext = context.getRuntimeContext(); final WindowingStrategy<?, ?> windowingStrategy = context.getInput(transform).getWindowingStrategy(); - final SparkPCollectionView pviews = context.getPviews(); + final SparkPCollectionView pviews = context.getPViews(); JavaDStream<WindowedValue<InputT>> dStream = ((UnboundedDataset<InputT>) context.borrowDataset(transform)).getDStream(); @@ -387,7 +387,7 @@ final class StreamingTransformTranslator { final Accumulator<NamedAggregators> accum = SparkAggregators.getNamedAggregators(jsc); - final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> sideInputs = + final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs = TranslationUtils.getSideInputs(transform.getSideInputs(), jsc, pviews); return rdd.mapPartitions( @@ -409,7 +409,7 @@ final class StreamingTransformTranslator { final DoFn<InputT, OutputT> doFn = transform.getFn(); rejectStateAndTimers(doFn); final SparkRuntimeContext runtimeContext = context.getRuntimeContext(); - final SparkPCollectionView pviews = context.getPviews(); + final SparkPCollectionView pviews = context.getPViews(); final WindowingStrategy<?, ?> windowingStrategy = context.getInput(transform).getWindowingStrategy(); @SuppressWarnings("unchecked") @@ -424,7 +424,7 @@ final class StreamingTransformTranslator { final Accumulator<NamedAggregators> accum = SparkAggregators.getNamedAggregators(new JavaSparkContext(rdd.context())); - final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> sideInputs = + final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs = TranslationUtils.getSideInputs(transform.getSideInputs(), JavaSparkContext.fromSparkContext(rdd.context()), pviews); return rdd.mapPartitionsToPair(new MultiDoFnFunction<>(accum, doFn, http://git-wip-us.apache.org/repos/asf/beam/blob/662934b1/runners/spark/src/main/java/org/apache/beam/runners/spark/util/BroadcastHelper.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/BroadcastHelper.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/BroadcastHelper.java deleted file mode 100644 index 946f786..0000000 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/BroadcastHelper.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.spark.util; - -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.io.Serializable; -import org.apache.beam.sdk.coders.Coder; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.broadcast.Broadcast; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Broadcast helper. - */ -public class BroadcastHelper<T> implements Serializable { - - /** - * If the property {@code beam.spark.directBroadcast} is set to - * {@code true} then Spark serialization (Kryo) will be used to broadcast values - * in View objects. By default this property is not set, and values are coded using - * the appropriate {@link Coder}. - */ - private static final Logger LOG = LoggerFactory.getLogger(BroadcastHelper.class); - private Broadcast<byte[]> bcast; - private final Coder<T> coder; - private transient T value; - private transient byte[] bytes = null; - - private BroadcastHelper(byte[] bytes, Coder<T> coder) { - this.bytes = bytes; - this.coder = coder; - } - - public static <T> BroadcastHelper<T> create(byte[] bytes, Coder<T> coder) { - return new BroadcastHelper<>(bytes, coder); - } - - public synchronized T getValue() { - if (value == null) { - value = deserialize(); - } - return value; - } - - public void broadcast(JavaSparkContext jsc) { - this.bcast = jsc.broadcast(bytes); - } - - public void unpersist() { - this.bcast.unpersist(); - } - - private T deserialize() { - T val; - try { - val = coder.decode(new ByteArrayInputStream(bcast.value()), new Coder.Context(true)); - } catch (IOException ioe) { - // this should not ever happen, log it if it does. - LOG.warn(ioe.getMessage()); - val = null; - } - return val; - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/662934b1/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SideInputBroadcast.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SideInputBroadcast.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SideInputBroadcast.java new file mode 100644 index 0000000..1fd2ea8 --- /dev/null +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SideInputBroadcast.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.spark.util; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.Serializable; +import org.apache.beam.sdk.coders.Coder; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.broadcast.Broadcast; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Broadcast helper for side inputs. Helps to do the transformation from + * bytes transform to broadcast transform to value by coder + */ +public class SideInputBroadcast<T> implements Serializable { + + private static final Logger LOG = LoggerFactory.getLogger(SideInputBroadcast.class); + private Broadcast<byte[]> bcast; + private final Coder<T> coder; + private transient T value; + private transient byte[] bytes = null; + + private SideInputBroadcast(byte[] bytes, Coder<T> coder) { + this.bytes = bytes; + this.coder = coder; + } + + public static <T> SideInputBroadcast<T> create(byte[] bytes, Coder<T> coder) { + return new SideInputBroadcast<>(bytes, coder); + } + + public synchronized T getValue() { + if (value == null) { + value = deserialize(); + } + return value; + } + + public void broadcast(JavaSparkContext jsc) { + this.bcast = jsc.broadcast(bytes); + } + + public void unpersist() { + this.bcast.unpersist(); + } + + private T deserialize() { + T val; + try { + val = coder.decode(new ByteArrayInputStream(bcast.value()), new Coder.Context(true)); + } catch (IOException ioe) { + // this should not ever happen, log it if it does. + LOG.warn(ioe.getMessage()); + val = null; + } + return val; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/662934b1/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkSideInputReader.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkSideInputReader.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkSideInputReader.java index 8167ee0..c8e9850 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkSideInputReader.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkSideInputReader.java @@ -37,10 +37,10 @@ import org.apache.beam.sdk.values.TupleTag; * A {@link SideInputReader} for thw SparkRunner. */ public class SparkSideInputReader implements SideInputReader { - private final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> sideInputs; + private final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs; - public SparkSideInputReader(Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> - sideInputs) { + public SparkSideInputReader( + Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs) { this.sideInputs = sideInputs; } @@ -49,7 +49,7 @@ public class SparkSideInputReader implements SideInputReader { public <T> T get(PCollectionView<T> view, BoundedWindow window) { //--- validate sideInput. checkNotNull(view, "The PCollectionView passed to sideInput cannot be null "); - KV<WindowingStrategy<?, ?>, BroadcastHelper<?>> windowedBroadcastHelper = + KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>> windowedBroadcastHelper = sideInputs.get(view.getTagInternal()); checkNotNull(windowedBroadcastHelper, "SideInput for view " + view + " is not available."); @@ -61,7 +61,6 @@ public class SparkSideInputReader implements SideInputReader { //--- match the appropriate sideInput window. // a tag will point to all matching sideInputs, that is all windows. // now that we've obtained the appropriate sideInputWindow, all that's left is to filter by it. - @SuppressWarnings("unchecked") Iterable<WindowedValue<?>> availableSideInputs = (Iterable<WindowedValue<?>>) windowedBroadcastHelper.getValue().getValue(); Iterable<WindowedValue<?>> sideInputForWindow = http://git-wip-us.apache.org/repos/asf/beam/blob/662934b1/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java index 352a7d8..7346bd9 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java @@ -178,8 +178,7 @@ public class ResumeFromCheckpointStreamingTest { .apply(ParDo.of(new DoFn<KV<String, String>, KV<String, String>>() { @ProcessElement public void process(ProcessContext c) { - - // Check side input is passed correctly + // Check side input is passed correctly also after resuming from checkpoint Assert.assertEquals(c.sideInput(expectedView), Arrays.asList(EXPECTED)); c.output(c.element()); } http://git-wip-us.apache.org/repos/asf/beam/blob/662934b1/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml ---------------------------------------------------------------------- diff --git a/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml b/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml index bfb4988..35b5ed3 100644 --- a/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml +++ b/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml @@ -125,32 +125,6 @@ </Match> <Match> - <Class name="org.apache.beam.runners.spark.util.BroadcastHelper$CodedBroadcastHelper"/> - <Or> - <Field name="bcast" /> - <Field name="value" /> - </Or> - <Bug pattern="IS2_INCONSISTENT_SYNC"/> - <!-- - Spark's Broadcast variables are a distributed and cached objects - and should not be treated as "normal" objects. - --> - </Match> - - <Match> - <Class name="org.apache.beam.runners.spark.util.BroadcastHelper$DirectBroadcastHelper"/> - <Or> - <Field name="bcast" /> - <Field name="value" /> - </Or> - <Bug pattern="IS2_INCONSISTENT_SYNC"/> - <!-- - Spark's Broadcast variables are a distributed and cached objects - and should not be treated as "normal" objects. - --> - </Match> - - <Match> <Class name="org.apache.beam.runners.spark.aggregators.metrics.sink.CsvSink"/> <Bug pattern="NM_SAME_SIMPLE_NAME_AS_SUPERCLASS"/> <!-- Intentionally overriding parent name because inheritors should replace the parent. -->