Repository: beam Updated Branches: refs/heads/master 69d951225 -> 34b38ef95
[BEAM-1623] Transform Reshuffle directly in Spark runner Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d8bc618e Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d8bc618e Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d8bc618e Branch: refs/heads/master Commit: d8bc618edafd07ae8e0ec692fc7f3df7395b876e Parents: 69d9512 Author: Aviem Zur <aviem...@gmail.com> Authored: Sun Mar 5 07:15:32 2017 +0200 Committer: Sela <ans...@paypal.com> Committed: Mon Mar 6 11:19:22 2017 +0200 ---------------------------------------------------------------------- .../translation/GroupCombineFunctions.java | 22 ++++++++++++ .../spark/translation/TransformTranslator.java | 38 +++++++++++++++----- .../spark/translation/TranslationUtils.java | 10 ++++++ .../streaming/StreamingTransformTranslator.java | 36 +++++++++++++++++++ 4 files changed, 97 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/d8bc618e/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java index 1e879ce..b2a589d 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java @@ -203,4 +203,26 @@ public class GroupCombineFunctions { return accumulatedBytes.mapToPair(CoderHelpers.fromByteFunction(keyCoder, iterAccumCoder)); } + + /** + * An implementation of + * {@link org.apache.beam.sdk.util.Reshuffle} for the Spark runner. + */ + public static <K, V> JavaRDD<WindowedValue<KV<K, V>>> reshuffle( + JavaRDD<WindowedValue<KV<K, V>>> rdd, + Coder<K> keyCoder, + WindowedValueCoder<V> wvCoder) { + + // Use coders to convert objects in the PCollection to byte arrays, so they + // can be transferred over the network for the shuffle. + return rdd + .map(new ReifyTimestampsAndWindowsFunction<K, V>()) + .map(WindowingHelpers.<KV<K, WindowedValue<V>>>unwindowFunction()) + .mapToPair(TranslationUtils.<K, WindowedValue<V>>toPairFunction()) + .mapToPair(CoderHelpers.toByteFunction(keyCoder, wvCoder)) + .repartition(rdd.getNumPartitions()) + .mapToPair(CoderHelpers.fromByteFunction(keyCoder, wvCoder)) + .map(TranslationUtils.<K, WindowedValue<V>>fromPairFunction()) + .map(TranslationUtils.<K, V>toKVByWindowInValue()); + } } http://git-wip-us.apache.org/repos/asf/beam/blob/d8bc618e/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java index a4939b9..0ae7313 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java @@ -69,6 +69,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.CombineFnUtil; +import org.apache.beam.sdk.util.Reshuffle; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.KV; @@ -318,15 +319,7 @@ public final class TransformTranslator { return sparkCombineFn.extractOutput(iter); } }).map(TranslationUtils.<K, WindowedValue<OutputT>>fromPairFunction()) - .map(new Function<KV<K, WindowedValue<OutputT>>, - WindowedValue<KV<K, OutputT>>>() { - @Override - public WindowedValue<KV<K, OutputT>> call( - KV<K, WindowedValue<OutputT>> kv) throws Exception { - WindowedValue<OutputT> wv = kv.getValue(); - return wv.withValue(KV.of(kv.getKey(), wv.getValue())); - } - }); + .map(TranslationUtils.<K, OutputT>toKVByWindowInValue()); context.putDataset(transform, new BoundedDataset<>(outRdd)); } @@ -735,6 +728,32 @@ public final class TransformTranslator { }; } + private static <K, V, W extends BoundedWindow> TransformEvaluator<Reshuffle<K, V>> reshuffle() { + return new TransformEvaluator<Reshuffle<K, V>>() { + @Override public void evaluate(Reshuffle<K, V> transform, EvaluationContext context) { + @SuppressWarnings("unchecked") + JavaRDD<WindowedValue<KV<K, V>>> inRDD = + ((BoundedDataset<KV<K, V>>) context.borrowDataset(transform)).getRDD(); + @SuppressWarnings("unchecked") + final WindowingStrategy<?, W> windowingStrategy = + (WindowingStrategy<?, W>) context.getInput(transform).getWindowingStrategy(); + @SuppressWarnings("unchecked") + final KvCoder<K, V> coder = (KvCoder<K, V>) context.getInput(transform).getCoder(); + @SuppressWarnings("unchecked") + final WindowFn<Object, W> windowFn = (WindowFn<Object, W>) windowingStrategy.getWindowFn(); + + final Coder<K> keyCoder = coder.getKeyCoder(); + final WindowedValue.WindowedValueCoder<V> wvCoder = + WindowedValue.FullWindowedValueCoder.of(coder.getValueCoder(), windowFn.windowCoder()); + + JavaRDD<WindowedValue<KV<K, V>>> reshuffled = + GroupCombineFunctions.reshuffle(inRDD, keyCoder, wvCoder); + + context.putDataset(transform, new BoundedDataset<>(reshuffled)); + } + }; + } + private static final Map<Class<? extends PTransform>, TransformEvaluator<?>> EVALUATORS = Maps .newHashMap(); @@ -753,6 +772,7 @@ public final class TransformTranslator { EVALUATORS.put(View.AsIterable.class, viewAsIter()); EVALUATORS.put(View.CreatePCollectionView.class, createPCollView()); EVALUATORS.put(Window.Assign.class, window()); + EVALUATORS.put(Reshuffle.class, reshuffle()); // mostly test evaluators EVALUATORS.put(StorageLevelPTransform.class, storageLevel()); } http://git-wip-us.apache.org/repos/asf/beam/blob/d8bc618e/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java index 158593e..f2b3418 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java @@ -159,6 +159,16 @@ public final class TranslationUtils { }; } + /** Extract window from a {@link KV} with {@link WindowedValue} value. */ + static <K, V> Function<KV<K, WindowedValue<V>>, WindowedValue<KV<K, V>>> toKVByWindowInValue() { + return new Function<KV<K, WindowedValue<V>>, WindowedValue<KV<K, V>>>() { + @Override public WindowedValue<KV<K, V>> call(KV<K, WindowedValue<V>> kv) throws Exception { + WindowedValue<V> wv = kv.getValue(); + return wv.withValue(KV.of(kv.getKey(), wv.getValue())); + } + }; + } + /** * A utility class to filter {@link TupleTag}s. * http://git-wip-us.apache.org/repos/asf/beam/blob/d8bc618e/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java index 628b713..31307cc 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java @@ -73,6 +73,7 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.CombineFnUtil; +import org.apache.beam.sdk.util.Reshuffle; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.KV; @@ -445,6 +446,40 @@ final class StreamingTransformTranslator { }; } + private static <K, V, W extends BoundedWindow> TransformEvaluator<Reshuffle<K, V>> reshuffle() { + return new TransformEvaluator<Reshuffle<K, V>>() { + @Override + public void evaluate(Reshuffle<K, V> transform, EvaluationContext context) { + @SuppressWarnings("unchecked") UnboundedDataset<KV<K, V>> inputDataset = + (UnboundedDataset<KV<K, V>>) context.borrowDataset(transform); + List<Integer> streamSources = inputDataset.getStreamSources(); + JavaDStream<WindowedValue<KV<K, V>>> dStream = inputDataset.getDStream(); + @SuppressWarnings("unchecked") + final KvCoder<K, V> coder = (KvCoder<K, V>) context.getInput(transform).getCoder(); + @SuppressWarnings("unchecked") + final WindowingStrategy<?, W> windowingStrategy = + (WindowingStrategy<?, W>) context.getInput(transform).getWindowingStrategy(); + @SuppressWarnings("unchecked") + final WindowFn<Object, W> windowFn = (WindowFn<Object, W>) windowingStrategy.getWindowFn(); + + final WindowedValue.WindowedValueCoder<V> wvCoder = + WindowedValue.FullWindowedValueCoder.of(coder.getValueCoder(), windowFn.windowCoder()); + + JavaDStream<WindowedValue<KV<K, V>>> reshuffledStream = + dStream.transform(new Function<JavaRDD<WindowedValue<KV<K, V>>>, + JavaRDD<WindowedValue<KV<K, V>>>>() { + @Override + public JavaRDD<WindowedValue<KV<K, V>>> call( + JavaRDD<WindowedValue<KV<K, V>>> rdd) throws Exception { + return GroupCombineFunctions.reshuffle(rdd, coder.getKeyCoder(), wvCoder); + } + }); + + context.putDataset(transform, new UnboundedDataset<>(reshuffledStream, streamSources)); + } + }; + } + private static final Map<Class<? extends PTransform>, TransformEvaluator<?>> EVALUATORS = Maps.newHashMap(); @@ -457,6 +492,7 @@ final class StreamingTransformTranslator { EVALUATORS.put(CreateStream.class, createFromQueue()); EVALUATORS.put(Window.Assign.class, window()); EVALUATORS.put(Flatten.PCollections.class, flattenPColl()); + EVALUATORS.put(Reshuffle.class, reshuffle()); } /**