Repository: beam Updated Branches: refs/heads/master 348d33588 -> c9e55a436
[BEAM-1810] Replace usage of RDD#isEmpty on non-serialized RDDs Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b32f0482 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b32f0482 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b32f0482 Branch: refs/heads/master Commit: b32f0482784b9df7ce67226b32febe6e664a45b6 Parents: 348d335 Author: Aviem Zur <aviem...@gmail.com> Authored: Sat Mar 25 21:49:06 2017 +0300 Committer: Aviem Zur <aviem...@gmail.com> Committed: Sun Mar 26 10:31:40 2017 +0300 ---------------------------------------------------------------------- .../translation/GroupCombineFunctions.java | 15 ++++++----- .../spark/translation/TransformTranslator.java | 26 ++++++++++++-------- 2 files changed, 25 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/b32f0482/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java index b2a589d..917a9ee 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java @@ -18,8 +18,7 @@ package org.apache.beam.runners.spark.translation; -import static com.google.common.base.Preconditions.checkArgument; - +import com.google.common.base.Optional; import org.apache.beam.runners.spark.coders.CoderHelpers; import org.apache.beam.runners.spark.util.ByteArray; import org.apache.beam.sdk.coders.Coder; @@ -67,14 +66,12 @@ public class GroupCombineFunctions { /** * Apply a composite {@link org.apache.beam.sdk.transforms.Combine.Globally} transformation. */ - public static <InputT, AccumT> Iterable<WindowedValue<AccumT>> combineGlobally( + public static <InputT, AccumT> Optional<Iterable<WindowedValue<AccumT>>> combineGlobally( JavaRDD<WindowedValue<InputT>> rdd, final SparkGlobalCombineFn<InputT, AccumT, ?> sparkCombineFn, final Coder<InputT> iCoder, final Coder<AccumT> aCoder, final WindowingStrategy<?, ?> windowingStrategy) { - checkArgument(!rdd.isEmpty(), "CombineGlobally computation should be skipped for empty RDDs."); - // coders. final WindowedValue.FullWindowedValueCoder<InputT> wviCoder = WindowedValue.FullWindowedValueCoder.of(iCoder, @@ -93,6 +90,11 @@ public class GroupCombineFunctions { //---- AccumT: A //---- InputT: I JavaRDD<byte[]> inputRDDBytes = rdd.map(CoderHelpers.toByteFunction(wviCoder)); + + if (inputRDDBytes.isEmpty()) { + return Optional.absent(); + } + /*Itr<WV<A>>*/ byte[] accumulatedBytes = inputRDDBytes.aggregate( CoderHelpers.toByteArray(sparkCombineFn.zeroValue(), iterAccumCoder), new Function2</*A*/ byte[], /*I*/ byte[], /*A*/ byte[]>() { @@ -115,7 +117,8 @@ public class GroupCombineFunctions { } } ); - return CoderHelpers.fromByteArray(accumulatedBytes, iterAccumCoder); + + return Optional.of(CoderHelpers.fromByteArray(accumulatedBytes, iterAccumCoder)); } /** http://git-wip-us.apache.org/repos/asf/beam/blob/b32f0482/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java index b4362b0..ffb207a 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java @@ -27,6 +27,7 @@ import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.replaceSh import static org.apache.beam.runners.spark.translation.TranslationUtils.rejectSplittable; import static org.apache.beam.runners.spark.translation.TranslationUtils.rejectStateAndTimers; +import com.google.common.base.Optional; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -259,9 +260,20 @@ public final class TransformTranslator { ((BoundedDataset<InputT>) context.borrowDataset(transform)).getRDD(); JavaRDD<WindowedValue<OutputT>> outRdd; - // handle empty input RDD, which will naturally skip the entire execution - // as Spark will not run on empty RDDs. - if (inRdd.isEmpty()) { + + Optional<Iterable<WindowedValue<AccumT>>> maybeAccumulated = + GroupCombineFunctions.combineGlobally(inRdd, sparkCombineFn, iCoder, aCoder, + windowingStrategy); + + if (maybeAccumulated.isPresent()) { + Iterable<WindowedValue<OutputT>> output = + sparkCombineFn.extractOutput(maybeAccumulated.get()); + outRdd = context.getSparkContext() + .parallelize(CoderHelpers.toByteArrays(output, wvoCoder)) + .map(CoderHelpers.fromByteFunction(wvoCoder)); + } else { + // handle empty input RDD, which will naturally skip the entire execution + // as Spark will not run on empty RDDs. JavaSparkContext jsc = new JavaSparkContext(inRdd.context()); if (hasDefault) { OutputT defaultValue = combineFn.defaultValue(); @@ -272,14 +284,8 @@ public final class TransformTranslator { } else { outRdd = jsc.emptyRDD(); } - } else { - Iterable<WindowedValue<AccumT>> accumulated = GroupCombineFunctions.combineGlobally( - inRdd, sparkCombineFn, iCoder, aCoder, windowingStrategy); - Iterable<WindowedValue<OutputT>> output = sparkCombineFn.extractOutput(accumulated); - outRdd = context.getSparkContext() - .parallelize(CoderHelpers.toByteArrays(output, wvoCoder)) - .map(CoderHelpers.fromByteFunction(wvoCoder)); } + context.putDataset(transform, new BoundedDataset<>(outRdd)); }