Ingest the input watermarks into the GlobalWatermarkHolder.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/bbf3744d Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/bbf3744d Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/bbf3744d Branch: refs/heads/master Commit: bbf3744d4cc1a6a58712b4c54c421b0009c5bb5e Parents: a620653 Author: Sela <ans...@paypal.com> Authored: Sun Feb 12 18:30:47 2017 +0200 Committer: Sela <ans...@paypal.com> Committed: Mon Feb 20 11:30:14 2017 +0200 ---------------------------------------------------------------------- .../runners/spark/io/SparkUnboundedSource.java | 69 ++++++++++++++------ .../spark/stateful/StateSpecFunctions.java | 31 ++++++--- 2 files changed, 72 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/bbf3744d/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java index f03dc8c..354461f 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java @@ -24,8 +24,13 @@ import org.apache.beam.runners.spark.SparkPipelineOptions; import org.apache.beam.runners.spark.coders.CoderHelpers; import org.apache.beam.runners.spark.stateful.StateSpecFunctions; import org.apache.beam.runners.spark.translation.SparkRuntimeContext; +import org.apache.beam.runners.spark.translation.streaming.UnboundedDataset; +import org.apache.beam.runners.spark.util.GlobalWatermarkHolder; +import org.apache.beam.runners.spark.util.GlobalWatermarkHolder.SparkWatermarks; import org.apache.beam.sdk.io.Source; import org.apache.beam.sdk.io.UnboundedSource; +import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.util.WindowedValue; import org.apache.spark.api.java.JavaRDD; @@ -55,7 +60,7 @@ import scala.runtime.BoxedUnit; * <p>This read is a composite of the following steps: * <ul> * <li>Create a single-element (per-partition) stream, that contains the (partitioned) - * {@link Source} and an optional {@link UnboundedSource.CheckpointMark} to start from.</li> + * {@link Source} and an optional {@link CheckpointMark} to start from.</li> * <li>Read from within a stateful operation {@link JavaPairInputDStream#mapWithState(StateSpec)} * using the {@link StateSpecFunctions#mapSourceFunction(SparkRuntimeContext)} mapping function, * which manages the state of the CheckpointMark per partition.</li> @@ -65,10 +70,11 @@ import scala.runtime.BoxedUnit; */ public class SparkUnboundedSource { - public static <T, CheckpointMarkT extends UnboundedSource.CheckpointMark> - JavaDStream<WindowedValue<T>> read(JavaStreamingContext jssc, - SparkRuntimeContext rc, - UnboundedSource<T, CheckpointMarkT> source) { + public static <T, CheckpointMarkT extends CheckpointMark> UnboundedDataset<T> read( + JavaStreamingContext jssc, + SparkRuntimeContext rc, + UnboundedSource<T, CheckpointMarkT> source) { + SparkPipelineOptions options = rc.getPipelineOptions().as(SparkPipelineOptions.class); Long maxRecordsPerBatch = options.getMaxRecordsPerBatch(); SourceDStream<T, CheckpointMarkT> sourceDStream = new SourceDStream<>(jssc.ssc(), source, rc); @@ -82,7 +88,7 @@ public class SparkUnboundedSource { JavaSparkContext$.MODULE$.<CheckpointMarkT>fakeClassTag()); // call mapWithState to read from a checkpointable sources. - JavaMapWithStateDStream<Source<T>, CheckpointMarkT, byte[], + JavaMapWithStateDStream<Source<T>, CheckpointMarkT, Tuple2<byte[], Instant>, Tuple2<Iterable<byte[]>, Metadata>> mapWithStateDStream = inputDStream.mapWithState( StateSpec.function(StateSpecFunctions.<T, CheckpointMarkT>mapSourceFunction(rc))); @@ -109,13 +115,14 @@ public class SparkUnboundedSource { WindowedValue.FullWindowedValueCoder.of( source.getDefaultOutputCoder(), GlobalWindow.Coder.INSTANCE); - return mapWithStateDStream.flatMap( + JavaDStream<WindowedValue<T>> readUnboundedStream = mapWithStateDStream.flatMap( new FlatMapFunction<Tuple2<Iterable<byte[]>, Metadata>, byte[]>() { @Override public Iterable<byte[]> call(Tuple2<Iterable<byte[]>, Metadata> t2) throws Exception { return t2._1(); } }).map(CoderHelpers.fromByteFunction(coder)); + return new UnboundedDataset<>(readUnboundedStream, Collections.singletonList(id)); } private static <T> String getSourceName(Source<T> source, int id) { @@ -173,30 +180,46 @@ public class SparkUnboundedSource { // compute parent. scala.Option<RDD<Metadata>> parentRDDOpt = parent.getOrCompute(validTime); long count = 0; - Instant globalWatermark = new Instant(Long.MIN_VALUE); + SparkWatermarks sparkWatermark = null; + Instant globalLowWatermarkForBatch = BoundedWindow.TIMESTAMP_MIN_VALUE; + Instant globalHighWatermarkForBatch = BoundedWindow.TIMESTAMP_MIN_VALUE; if (parentRDDOpt.isDefined()) { JavaRDD<Metadata> parentRDD = parentRDDOpt.get().toJavaRDD(); for (Metadata metadata: parentRDD.collect()) { count += metadata.getNumRecords(); - // a monotonically increasing watermark. - globalWatermark = globalWatermark.isBefore(metadata.getWatermark()) - ? metadata.getWatermark() : globalWatermark; + // compute the global input watermark - advance to latest of all partitions. + Instant partitionLowWatermark = metadata.getLowWatermark(); + globalLowWatermarkForBatch = + globalLowWatermarkForBatch.isBefore(partitionLowWatermark) + ? partitionLowWatermark : globalLowWatermarkForBatch; + Instant partitionHighWatermark = metadata.getHighWatermark(); + globalHighWatermarkForBatch = + globalHighWatermarkForBatch.isBefore(partitionHighWatermark) + ? partitionHighWatermark : globalHighWatermarkForBatch; } + + sparkWatermark = + new SparkWatermarks( + globalLowWatermarkForBatch, + globalHighWatermarkForBatch, + new Instant(validTime.milliseconds())); + // add to watermark queue. + GlobalWatermarkHolder.add(inputDStreamId, sparkWatermark); } // report - for RateEstimator and visibility. - report(validTime, count, globalWatermark); + report(validTime, count, sparkWatermark); return scala.Option.empty(); } - private void report(Time batchTime, long count, Instant watermark) { + private void report(Time batchTime, long count, SparkWatermarks sparkWatermark) { // metadata - #records read and a description. scala.collection.immutable.Map<String, Object> metadata = new scala.collection.immutable.Map.Map1<String, Object>( StreamInputInfo.METADATA_KEY_DESCRIPTION(), String.format( - "Read %d records with observed watermark %s, from %s for batch time: %s", + "Read %d records with observed watermarks %s, from %s for batch time: %s", count, - watermark, + sparkWatermark == null ? "N/A" : sparkWatermark, sourceName, batchTime)); StreamInputInfo streamInputInfo = new StreamInputInfo(inputDStreamId, count, metadata); @@ -209,19 +232,25 @@ public class SparkUnboundedSource { */ public static class Metadata implements Serializable { private final long numRecords; - private final Instant watermark; + private final Instant lowWatermark; + private final Instant highWatermark; - public Metadata(long numRecords, Instant watermark) { + public Metadata(long numRecords, Instant lowWatermark, Instant highWatermark) { this.numRecords = numRecords; - this.watermark = watermark; + this.lowWatermark = lowWatermark; + this.highWatermark = highWatermark; } public long getNumRecords() { return numRecords; } - public Instant getWatermark() { - return watermark; + public Instant getLowWatermark() { + return lowWatermark; + } + + public Instant getHighWatermark() { + return highWatermark; } } } http://git-wip-us.apache.org/repos/asf/beam/blob/bbf3744d/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java index ffe0ddd..ae5a746 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java @@ -35,6 +35,7 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.Source; import org.apache.beam.sdk.io.UnboundedSource; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.WindowedValue; @@ -94,29 +95,36 @@ public class StateSpecFunctions { * @return The appropriate {@link org.apache.spark.streaming.StateSpec} function. */ public static <T, CheckpointMarkT extends UnboundedSource.CheckpointMark> - scala.Function3<Source<T>, scala.Option<CheckpointMarkT>, /* CheckpointMarkT */State<byte[]>, + scala.Function3<Source<T>, scala.Option<CheckpointMarkT>, State<Tuple2<byte[], Instant>>, Tuple2<Iterable<byte[]>, Metadata>> mapSourceFunction( final SparkRuntimeContext runtimeContext) { - return new SerializableFunction3<Source<T>, Option<CheckpointMarkT>, State<byte[]>, - Tuple2<Iterable<byte[]>, Metadata>>() { + return new SerializableFunction3<Source<T>, Option<CheckpointMarkT>, + State<Tuple2<byte[], Instant>>, Tuple2<Iterable<byte[]>, Metadata>>() { @Override public Tuple2<Iterable<byte[]>, Metadata> apply( Source<T> source, scala.Option<CheckpointMarkT> startCheckpointMark, - State<byte[]> state) { + State<Tuple2<byte[], Instant>> state) { + // source as MicrobatchSource MicrobatchSource<T, CheckpointMarkT> microbatchSource = (MicrobatchSource<T, CheckpointMarkT>) source; + // Initial high/low watermarks. + Instant lowWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE; + Instant highWatermark; + // if state exists, use it, otherwise it's first time so use the startCheckpointMark. // startCheckpointMark may be EmptyCheckpointMark (the Spark Java API tries to apply // Optional(null)), which is handled by the UnboundedSource implementation. Coder<CheckpointMarkT> checkpointCoder = microbatchSource.getCheckpointMarkCoder(); CheckpointMarkT checkpointMark; if (state.exists()) { - checkpointMark = CoderHelpers.fromByteArray(state.get(), checkpointCoder); + // previous (output) watermark is now the low watermark. + lowWatermark = state.get()._2(); + checkpointMark = CoderHelpers.fromByteArray(state.get()._1(), checkpointCoder); LOG.info("Continue reading from an existing CheckpointMark."); } else if (startCheckpointMark.isDefined() && !startCheckpointMark.get().equals(EmptyCheckpointMark.get())) { @@ -154,7 +162,10 @@ public class StateSpecFunctions { finished = !reader.advance(); } - watermark = ((MicrobatchSource.Reader) reader).getWatermark(); + // end-of-read watermark is the high watermark, but don't allow decrease. + Instant sourceWatermark = ((MicrobatchSource.Reader) reader).getWatermark(); + highWatermark = sourceWatermark.isAfter(lowWatermark) ? sourceWatermark : lowWatermark; + // close and checkpoint reader. reader.close(); LOG.info("Source id {} spent {} msec on reading.", microbatchSource.getId(), @@ -164,11 +175,15 @@ public class StateSpecFunctions { @SuppressWarnings("unchecked") CheckpointMarkT finishedReadCheckpointMark = (CheckpointMarkT) ((MicrobatchSource.Reader) reader).getCheckpointMark(); + byte[] codedCheckpoint = new byte[0]; if (finishedReadCheckpointMark != null) { - state.update(CoderHelpers.toByteArray(finishedReadCheckpointMark, checkpointCoder)); + codedCheckpoint = CoderHelpers.toByteArray(finishedReadCheckpointMark, checkpointCoder); } else { LOG.info("Skipping checkpoint marking because the reader failed to supply one."); } + // persist the end-of-read (high) watermark for following read, where it will become + // the next low watermark. + state.update(new Tuple2<>(codedCheckpoint, highWatermark)); } catch (IOException e) { throw new RuntimeException("Failed to read from reader.", e); } @@ -179,7 +194,7 @@ public class StateSpecFunctions { return Iterators.unmodifiableIterator(readValues.iterator()); } }; - return new Tuple2<>(iterable, new Metadata(readValues.size(), watermark)); + return new Tuple2<>(iterable, new Metadata(readValues.size(), lowWatermark, highWatermark)); } }; }