Repository: beam Updated Branches: refs/heads/master 11a381b23 -> 59451bca6
[BEAM-1625] BoundedDataset action() does not materialize RDD Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4febd954 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4febd954 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4febd954 Branch: refs/heads/master Commit: 4febd954af00458032efbee45b7f9724fe0ea9ed Parents: 11a381b Author: Aviem Zur <aviem...@gmail.com> Authored: Sun Mar 5 16:17:35 2017 +0200 Committer: Aviem Zur <aviem...@gmail.com> Committed: Sun Mar 5 16:17:35 2017 +0200 ---------------------------------------------------------------------- .../apache/beam/runners/spark/translation/BoundedDataset.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/4febd954/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java index 5e19846..7db04a8 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java @@ -20,7 +20,6 @@ package org.apache.beam.runners.spark.translation; import com.google.common.base.Function; import com.google.common.collect.Iterables; -import java.util.Iterator; import java.util.List; import javax.annotation.Nullable; import org.apache.beam.runners.spark.coders.CoderHelpers; @@ -107,9 +106,8 @@ public class BoundedDataset<T> implements Dataset { @Override public void action() { // Empty function to force computation of RDD. - rdd.foreachPartition(new VoidFunction<Iterator<WindowedValue<T>>>() { - @Override - public void call(Iterator<WindowedValue<T>> windowedValueIterator) throws Exception { + rdd.foreach(new VoidFunction<WindowedValue<T>>() { + @Override public void call(WindowedValue<T> tWindowedValue) throws Exception { // Empty implementation. } });