Repository: beam Updated Branches: refs/heads/master 769398e40 -> 2a40534e8
Extracted captures to static classes to prevent them from capturing the scope. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3876f83a Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3876f83a Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3876f83a Branch: refs/heads/master Commit: 3876f83a82845e3c0a41152cf7a7c58378d994e7 Parents: 769398e Author: Stas Levin <stasle...@apache.org> Authored: Wed Mar 29 15:29:20 2017 +0300 Committer: Stas Levin <stasle...@apache.org> Committed: Thu Mar 30 14:53:38 2017 +0300 ---------------------------------------------------------------------- .../runners/spark/io/SparkUnboundedSource.java | 37 ++++++++++++-------- .../spark/stateful/StateSpecFunctions.java | 17 +++++---- 2 files changed, 31 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/3876f83a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java index a538907..162bca4 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java @@ -102,13 +102,7 @@ public class SparkUnboundedSource { // report the number of input elements for this InputDStream to the InputInfoTracker. int id = inputDStream.inputDStream().id(); - JavaDStream<Metadata> metadataDStream = mapWithStateDStream.map( - new Function<Tuple2<Iterable<byte[]>, Metadata>, Metadata>() { - @Override - public Metadata call(Tuple2<Iterable<byte[]>, Metadata> t2) throws Exception { - return t2._2(); - } - }); + JavaDStream<Metadata> metadataDStream = mapWithStateDStream.map(new Tuple2MetadataFunction()); // register ReadReportDStream to report information related to this read. new ReadReportDStream(metadataDStream.dstream(), id, getSourceName(source, id)).register(); @@ -118,13 +112,10 @@ public class SparkUnboundedSource { WindowedValue.FullWindowedValueCoder.of( source.getDefaultOutputCoder(), GlobalWindow.Coder.INSTANCE); - JavaDStream<WindowedValue<T>> readUnboundedStream = mapWithStateDStream.flatMap( - new FlatMapFunction<Tuple2<Iterable<byte[]>, Metadata>, byte[]>() { - @Override - public Iterable<byte[]> call(Tuple2<Iterable<byte[]>, Metadata> t2) throws Exception { - return t2._1(); - } - }).map(CoderHelpers.fromByteFunction(coder)); + JavaDStream<WindowedValue<T>> readUnboundedStream = + mapWithStateDStream + .flatMap(new Tuple2byteFlatMapFunction()) + .map(CoderHelpers.fromByteFunction(coder)); return new UnboundedDataset<>(readUnboundedStream, Collections.singletonList(id)); } @@ -274,4 +265,22 @@ public class SparkUnboundedSource { return metricsContainer; } } + + private static class Tuple2MetadataFunction + implements Function<Tuple2<Iterable<byte[]>, Metadata>, Metadata> { + + @Override + public Metadata call(Tuple2<Iterable<byte[]>, Metadata> t2) throws Exception { + return t2._2(); + } + } + + private static class Tuple2byteFlatMapFunction + implements FlatMapFunction<Tuple2<Iterable<byte[]>, Metadata>, byte[]> { + + @Override + public Iterable<byte[]> call(Tuple2<Iterable<byte[]>, Metadata> t2) throws Exception { + return t2._1(); + } + } } http://git-wip-us.apache.org/repos/asf/beam/blob/3876f83a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java index ec4fce3..803fe45 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java @@ -20,11 +20,11 @@ package org.apache.beam.runners.spark.stateful; import com.google.common.base.Stopwatch; import com.google.common.collect.Iterators; +import com.google.common.collect.Lists; import java.io.Closeable; import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; -import java.util.Iterator; import java.util.List; import java.util.concurrent.TimeUnit; import org.apache.beam.runners.spark.coders.CoderHelpers; @@ -197,14 +197,13 @@ public class StateSpecFunctions { throw new RuntimeException("Failed to read from reader.", e); } - Iterable <byte[]> iterable = new Iterable<byte[]>() { - @Override - public Iterator<byte[]> iterator() { - return Iterators.unmodifiableIterator(readValues.iterator()); - } - }; - return new Tuple2<>(iterable, - new Metadata(readValues.size(), lowWatermark, highWatermark, sparkMetricsContainer)); + final ArrayList<byte[]> payload = + Lists.newArrayList(Iterators.unmodifiableIterator(readValues.iterator())); + + return new Tuple2<>( + (Iterable<byte[]>) payload, + new Metadata(readValues.size(), lowWatermark, highWatermark, sparkMetricsContainer)); + } catch (IOException e) { throw new RuntimeException(e); }