Ingest the input watermarks into the GlobalWatermarkHolder.

Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/bbf3744d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/bbf3744d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/bbf3744d

Branch: refs/heads/master
Commit: bbf3744d4cc1a6a58712b4c54c421b0009c5bb5e
Parents: a620653
Author: Sela <ans...@paypal.com>
Authored: Sun Feb 12 18:30:47 2017 +0200
Committer: Sela <ans...@paypal.com>
Committed: Mon Feb 20 11:30:14 2017 +0200

----------------------------------------------------------------------
 .../runners/spark/io/SparkUnboundedSource.java  | 69 ++++++++++++++------
 .../spark/stateful/StateSpecFunctions.java      | 31 ++++++---
 2 files changed, 72 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/bbf3744d/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java
index f03dc8c..354461f 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java
@@ -24,8 +24,13 @@ import org.apache.beam.runners.spark.SparkPipelineOptions;
 import org.apache.beam.runners.spark.coders.CoderHelpers;
 import org.apache.beam.runners.spark.stateful.StateSpecFunctions;
 import org.apache.beam.runners.spark.translation.SparkRuntimeContext;
+import org.apache.beam.runners.spark.translation.streaming.UnboundedDataset;
+import org.apache.beam.runners.spark.util.GlobalWatermarkHolder;
+import 
org.apache.beam.runners.spark.util.GlobalWatermarkHolder.SparkWatermarks;
 import org.apache.beam.sdk.io.Source;
 import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.spark.api.java.JavaRDD;
@@ -55,7 +60,7 @@ import scala.runtime.BoxedUnit;
  * <p>This read is a composite of the following steps:
  * <ul>
  * <li>Create a single-element (per-partition) stream, that contains the 
(partitioned)
- * {@link Source} and an optional {@link UnboundedSource.CheckpointMark} to 
start from.</li>
+ * {@link Source} and an optional {@link CheckpointMark} to start from.</li>
  * <li>Read from within a stateful operation {@link 
JavaPairInputDStream#mapWithState(StateSpec)}
  * using the {@link StateSpecFunctions#mapSourceFunction(SparkRuntimeContext)} 
mapping function,
  * which manages the state of the CheckpointMark per partition.</li>
@@ -65,10 +70,11 @@ import scala.runtime.BoxedUnit;
  */
 public class SparkUnboundedSource {
 
-  public static <T, CheckpointMarkT extends UnboundedSource.CheckpointMark>
-  JavaDStream<WindowedValue<T>> read(JavaStreamingContext jssc,
-                                     SparkRuntimeContext rc,
-                                     UnboundedSource<T, CheckpointMarkT> 
source) {
+  public static <T, CheckpointMarkT extends CheckpointMark> 
UnboundedDataset<T> read(
+      JavaStreamingContext jssc,
+      SparkRuntimeContext rc,
+      UnboundedSource<T, CheckpointMarkT> source) {
+
     SparkPipelineOptions options = 
rc.getPipelineOptions().as(SparkPipelineOptions.class);
     Long maxRecordsPerBatch = options.getMaxRecordsPerBatch();
     SourceDStream<T, CheckpointMarkT> sourceDStream = new 
SourceDStream<>(jssc.ssc(), source, rc);
@@ -82,7 +88,7 @@ public class SparkUnboundedSource {
                 JavaSparkContext$.MODULE$.<CheckpointMarkT>fakeClassTag());
 
     // call mapWithState to read from a checkpointable sources.
-    JavaMapWithStateDStream<Source<T>, CheckpointMarkT, byte[],
+    JavaMapWithStateDStream<Source<T>, CheckpointMarkT, Tuple2<byte[], 
Instant>,
         Tuple2<Iterable<byte[]>, Metadata>> mapWithStateDStream = 
inputDStream.mapWithState(
             StateSpec.function(StateSpecFunctions.<T, 
CheckpointMarkT>mapSourceFunction(rc)));
 
@@ -109,13 +115,14 @@ public class SparkUnboundedSource {
         WindowedValue.FullWindowedValueCoder.of(
             source.getDefaultOutputCoder(),
             GlobalWindow.Coder.INSTANCE);
-    return mapWithStateDStream.flatMap(
+    JavaDStream<WindowedValue<T>> readUnboundedStream = 
mapWithStateDStream.flatMap(
         new FlatMapFunction<Tuple2<Iterable<byte[]>, Metadata>, byte[]>() {
           @Override
           public Iterable<byte[]> call(Tuple2<Iterable<byte[]>, Metadata> t2) 
throws Exception {
             return t2._1();
           }
         }).map(CoderHelpers.fromByteFunction(coder));
+    return new UnboundedDataset<>(readUnboundedStream, 
Collections.singletonList(id));
   }
 
   private static <T> String getSourceName(Source<T> source, int id) {
@@ -173,30 +180,46 @@ public class SparkUnboundedSource {
       // compute parent.
       scala.Option<RDD<Metadata>> parentRDDOpt = 
parent.getOrCompute(validTime);
       long count = 0;
-      Instant globalWatermark = new Instant(Long.MIN_VALUE);
+      SparkWatermarks sparkWatermark = null;
+      Instant globalLowWatermarkForBatch = BoundedWindow.TIMESTAMP_MIN_VALUE;
+      Instant globalHighWatermarkForBatch = BoundedWindow.TIMESTAMP_MIN_VALUE;
       if (parentRDDOpt.isDefined()) {
         JavaRDD<Metadata> parentRDD = parentRDDOpt.get().toJavaRDD();
         for (Metadata metadata: parentRDD.collect()) {
           count += metadata.getNumRecords();
-          // a monotonically increasing watermark.
-          globalWatermark = globalWatermark.isBefore(metadata.getWatermark())
-              ? metadata.getWatermark() : globalWatermark;
+          // compute the global input watermark - advance to latest of all 
partitions.
+          Instant partitionLowWatermark = metadata.getLowWatermark();
+          globalLowWatermarkForBatch =
+              globalLowWatermarkForBatch.isBefore(partitionLowWatermark)
+                  ? partitionLowWatermark : globalLowWatermarkForBatch;
+          Instant partitionHighWatermark = metadata.getHighWatermark();
+          globalHighWatermarkForBatch =
+              globalHighWatermarkForBatch.isBefore(partitionHighWatermark)
+                  ? partitionHighWatermark : globalHighWatermarkForBatch;
         }
+
+        sparkWatermark =
+            new SparkWatermarks(
+                globalLowWatermarkForBatch,
+                globalHighWatermarkForBatch,
+                new Instant(validTime.milliseconds()));
+        // add to watermark queue.
+        GlobalWatermarkHolder.add(inputDStreamId, sparkWatermark);
       }
       // report - for RateEstimator and visibility.
-      report(validTime, count, globalWatermark);
+      report(validTime, count, sparkWatermark);
       return scala.Option.empty();
     }
 
-    private void report(Time batchTime, long count, Instant watermark) {
+    private void report(Time batchTime, long count, SparkWatermarks 
sparkWatermark) {
       // metadata - #records read and a description.
       scala.collection.immutable.Map<String, Object> metadata =
           new scala.collection.immutable.Map.Map1<String, Object>(
               StreamInputInfo.METADATA_KEY_DESCRIPTION(),
               String.format(
-                  "Read %d records with observed watermark %s, from %s for 
batch time: %s",
+                  "Read %d records with observed watermarks %s, from %s for 
batch time: %s",
                   count,
-                  watermark,
+                  sparkWatermark == null ? "N/A" : sparkWatermark,
                   sourceName,
                   batchTime));
       StreamInputInfo streamInputInfo = new StreamInputInfo(inputDStreamId, 
count, metadata);
@@ -209,19 +232,25 @@ public class SparkUnboundedSource {
    */
   public static class Metadata implements Serializable {
     private final long numRecords;
-    private final Instant watermark;
+    private final Instant lowWatermark;
+    private final Instant highWatermark;
 
-    public Metadata(long numRecords, Instant watermark) {
+    public Metadata(long numRecords, Instant lowWatermark, Instant 
highWatermark) {
       this.numRecords = numRecords;
-      this.watermark = watermark;
+      this.lowWatermark = lowWatermark;
+      this.highWatermark = highWatermark;
     }
 
     public long getNumRecords() {
       return numRecords;
     }
 
-    public Instant getWatermark() {
-      return watermark;
+    public Instant getLowWatermark() {
+      return lowWatermark;
+    }
+
+    public Instant getHighWatermark() {
+      return highWatermark;
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/bbf3744d/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java
index ffe0ddd..ae5a746 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java
@@ -35,6 +35,7 @@ import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.Source;
 import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -94,29 +95,36 @@ public class StateSpecFunctions {
    * @return The appropriate {@link org.apache.spark.streaming.StateSpec} 
function.
    */
   public static <T, CheckpointMarkT extends UnboundedSource.CheckpointMark>
-  scala.Function3<Source<T>, scala.Option<CheckpointMarkT>, /* CheckpointMarkT 
*/State<byte[]>,
+  scala.Function3<Source<T>, scala.Option<CheckpointMarkT>, 
State<Tuple2<byte[], Instant>>,
       Tuple2<Iterable<byte[]>, Metadata>> mapSourceFunction(
            final SparkRuntimeContext runtimeContext) {
 
-    return new SerializableFunction3<Source<T>, Option<CheckpointMarkT>, 
State<byte[]>,
-        Tuple2<Iterable<byte[]>, Metadata>>() {
+    return new SerializableFunction3<Source<T>, Option<CheckpointMarkT>,
+        State<Tuple2<byte[], Instant>>, Tuple2<Iterable<byte[]>, Metadata>>() {
 
       @Override
       public Tuple2<Iterable<byte[]>, Metadata> apply(
           Source<T> source,
           scala.Option<CheckpointMarkT> startCheckpointMark,
-          State<byte[]> state) {
+          State<Tuple2<byte[], Instant>> state) {
+
         // source as MicrobatchSource
         MicrobatchSource<T, CheckpointMarkT> microbatchSource =
             (MicrobatchSource<T, CheckpointMarkT>) source;
 
+        // Initial high/low watermarks.
+        Instant lowWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
+        Instant highWatermark;
+
         // if state exists, use it, otherwise it's first time so use the 
startCheckpointMark.
         // startCheckpointMark may be EmptyCheckpointMark (the Spark Java API 
tries to apply
         // Optional(null)), which is handled by the UnboundedSource 
implementation.
         Coder<CheckpointMarkT> checkpointCoder = 
microbatchSource.getCheckpointMarkCoder();
         CheckpointMarkT checkpointMark;
         if (state.exists()) {
-          checkpointMark = CoderHelpers.fromByteArray(state.get(), 
checkpointCoder);
+          // previous (output) watermark is now the low watermark.
+          lowWatermark = state.get()._2();
+          checkpointMark = CoderHelpers.fromByteArray(state.get()._1(), 
checkpointCoder);
           LOG.info("Continue reading from an existing CheckpointMark.");
         } else if (startCheckpointMark.isDefined()
             && !startCheckpointMark.get().equals(EmptyCheckpointMark.get())) {
@@ -154,7 +162,10 @@ public class StateSpecFunctions {
             finished = !reader.advance();
           }
 
-          watermark = ((MicrobatchSource.Reader) reader).getWatermark();
+          // end-of-read watermark is the high watermark, but don't allow 
decrease.
+          Instant sourceWatermark = ((MicrobatchSource.Reader) 
reader).getWatermark();
+          highWatermark = sourceWatermark.isAfter(lowWatermark) ? 
sourceWatermark : lowWatermark;
+
           // close and checkpoint reader.
           reader.close();
           LOG.info("Source id {} spent {} msec on reading.", 
microbatchSource.getId(),
@@ -164,11 +175,15 @@ public class StateSpecFunctions {
           @SuppressWarnings("unchecked")
           CheckpointMarkT finishedReadCheckpointMark =
               (CheckpointMarkT) ((MicrobatchSource.Reader) 
reader).getCheckpointMark();
+          byte[] codedCheckpoint = new byte[0];
           if (finishedReadCheckpointMark != null) {
-            state.update(CoderHelpers.toByteArray(finishedReadCheckpointMark, 
checkpointCoder));
+            codedCheckpoint = 
CoderHelpers.toByteArray(finishedReadCheckpointMark, checkpointCoder);
           } else {
             LOG.info("Skipping checkpoint marking because the reader failed to 
supply one.");
           }
+          // persist the end-of-read (high) watermark for following read, 
where it will become
+          // the next low watermark.
+          state.update(new Tuple2<>(codedCheckpoint, highWatermark));
         } catch (IOException e) {
           throw new RuntimeException("Failed to read from reader.", e);
         }
@@ -179,7 +194,7 @@ public class StateSpecFunctions {
             return Iterators.unmodifiableIterator(readValues.iterator());
           }
         };
-        return new Tuple2<>(iterable, new Metadata(readValues.size(), 
watermark));
+        return new Tuple2<>(iterable, new Metadata(readValues.size(), 
lowWatermark, highWatermark));
       }
     };
   }

Reply via email to