Repository: beam
Updated Branches:
  refs/heads/master 92190ba5d -> aa45ccb08


A global Watermark holder to update and broadcast to workers.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a6206535
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a6206535
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a6206535

Branch: refs/heads/master
Commit: a6206535ec84d8c01695368c8973a6577ae6d953
Parents: 92190ba
Author: Sela <ans...@paypal.com>
Authored: Sun Feb 12 18:28:19 2017 +0200
Committer: Sela <ans...@paypal.com>
Committed: Mon Feb 20 11:30:13 2017 +0200

----------------------------------------------------------------------
 .../spark/util/GlobalWatermarkHolder.java       | 200 +++++++++++++++++++
 1 file changed, 200 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/a6206535/runners/spark/src/main/java/org/apache/beam/runners/spark/util/GlobalWatermarkHolder.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/GlobalWatermarkHolder.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/GlobalWatermarkHolder.java
new file mode 100644
index 0000000..b215b5f
--- /dev/null
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/GlobalWatermarkHolder.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.spark.util;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.apache.spark.streaming.api.java.JavaStreamingListener;
+import org.apache.spark.streaming.api.java.JavaStreamingListenerBatchCompleted;
+import org.joda.time.Instant;
+
+
+/**
+ * A {@link Broadcast} variable to hold the global watermarks for a 
micro-batch.
+ *
+ * <p>For each source, holds a queue for the watermarks of each micro-batch 
that was read,
+ * and advances the watermarks according to the queue (first-in-first-out).
+ */
+public class GlobalWatermarkHolder {
+  // the broadcast is broadcasted to the workers.
+  private static volatile Broadcast<Map<Integer, SparkWatermarks>> broadcast = 
null;
+  // this should only live in the driver so transient.
+  private static final transient Map<Integer, Queue<SparkWatermarks>> 
sourceTimes = new HashMap<>();
+
+  public static void add(int sourceId, SparkWatermarks sparkWatermarks) {
+    Queue<SparkWatermarks> timesQueue = sourceTimes.get(sourceId);
+    if (timesQueue == null) {
+      timesQueue = new ConcurrentLinkedQueue<>();
+    }
+    timesQueue.offer(sparkWatermarks);
+    sourceTimes.put(sourceId, timesQueue);
+  }
+
+  @VisibleForTesting
+  public static void addAll(Map<Integer, Queue<SparkWatermarks>> sourceTimes) {
+    for (Map.Entry<Integer, Queue<SparkWatermarks>> en: 
sourceTimes.entrySet()) {
+      int sourceId = en.getKey();
+      Queue<SparkWatermarks> timesQueue = en.getValue();
+      while (!timesQueue.isEmpty()) {
+        add(sourceId, timesQueue.poll());
+      }
+    }
+  }
+
+  /**
+   * Returns the {@link Broadcast} containing the {@link SparkWatermarks} 
mapped
+   * to their sources.
+   */
+  public static Broadcast<Map<Integer, SparkWatermarks>> get() {
+    return broadcast;
+  }
+
+  /**
+   * Advances the watermarks to the next-in-line watermarks.
+   * SparkWatermarks are monotonically increasing.
+   */
+  public static void advance(JavaSparkContext jsc) {
+    synchronized (GlobalWatermarkHolder.class){
+      if (sourceTimes.isEmpty()) {
+        return;
+      }
+
+      // update all sources' watermarks into the new broadcast.
+      Map<Integer, SparkWatermarks> newBroadcast = new HashMap<>();
+
+      for (Map.Entry<Integer, Queue<SparkWatermarks>> en: 
sourceTimes.entrySet()) {
+        if (en.getValue().isEmpty()) {
+          continue;
+        }
+        Integer sourceId = en.getKey();
+        Queue<SparkWatermarks> timesQueue = en.getValue();
+
+        // current state, if exists.
+        Instant currentLowWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
+        Instant currentHighWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
+        Instant currentSynchronizedProcessingTime = 
BoundedWindow.TIMESTAMP_MIN_VALUE;
+        if (broadcast != null && broadcast.getValue().containsKey(sourceId)) {
+          SparkWatermarks currentTimes = broadcast.getValue().get(sourceId);
+          currentLowWatermark = currentTimes.getLowWatermark();
+          currentHighWatermark = currentTimes.getHighWatermark();
+          currentSynchronizedProcessingTime = 
currentTimes.getSynchronizedProcessingTime();
+        }
+
+        SparkWatermarks next = timesQueue.poll();
+        // advance watermarks monotonically.
+        Instant nextLowWatermark = 
next.getLowWatermark().isAfter(currentLowWatermark)
+            ? next.getLowWatermark() : currentLowWatermark;
+        Instant nextHighWatermark = 
next.getHighWatermark().isAfter(currentHighWatermark)
+            ? next.getHighWatermark() : currentHighWatermark;
+        Instant nextSynchronizedProcessingTime = 
next.getSynchronizedProcessingTime();
+        checkState(!nextLowWatermark.isAfter(nextHighWatermark),
+            String.format(
+                "Low watermark %s cannot be later then high watermark %s",
+                nextLowWatermark, nextHighWatermark));
+        
checkState(nextSynchronizedProcessingTime.isAfter(currentSynchronizedProcessingTime),
+            "Synchronized processing time must advance.");
+        newBroadcast.put(
+            sourceId,
+            new SparkWatermarks(
+                nextLowWatermark, nextHighWatermark, 
nextSynchronizedProcessingTime));
+      }
+
+      // update the watermarks broadcast only if something has changed.
+      if (!newBroadcast.isEmpty()) {
+        if (broadcast != null) {
+          // for now this is blocking, we could make this asynchronous
+          // but it could slow down WM propagation.
+          broadcast.unpersist(true);
+        }
+        broadcast = jsc.broadcast(newBroadcast);
+      }
+    }
+  }
+
+  @VisibleForTesting
+  public static synchronized void clear() {
+    sourceTimes.clear();
+    broadcast = null;
+  }
+
+  /**
+   * A {@link SparkWatermarks} holds the watermarks and batch time
+   * relevant to a micro-batch input from a specific source.
+   */
+  public static class SparkWatermarks implements Serializable {
+    private final Instant lowWatermark;
+    private final Instant highWatermark;
+    private final Instant synchronizedProcessingTime;
+
+    @VisibleForTesting
+    public SparkWatermarks(
+        Instant lowWatermark,
+        Instant highWatermark,
+        Instant synchronizedProcessingTime) {
+      this.lowWatermark = lowWatermark;
+      this.highWatermark = highWatermark;
+      this.synchronizedProcessingTime = synchronizedProcessingTime;
+    }
+
+    public Instant getLowWatermark() {
+      return lowWatermark;
+    }
+
+    public Instant getHighWatermark() {
+      return highWatermark;
+    }
+
+    public Instant getSynchronizedProcessingTime() {
+      return synchronizedProcessingTime;
+    }
+
+    @Override
+    public String toString() {
+      return "SparkWatermarks{"
+          + "lowWatermark=" + lowWatermark
+          + ", highWatermark=" + highWatermark
+          + ", synchronizedProcessingTime=" + synchronizedProcessingTime + '}';
+    }
+  }
+
+  /** Advance the WMs onBatchCompleted event. */
+  public static class WatermarksListener extends JavaStreamingListener {
+    private final JavaStreamingContext jssc;
+
+    public WatermarksListener(JavaStreamingContext jssc) {
+      this.jssc = jssc;
+    }
+
+    @Override
+    public void onBatchCompleted(JavaStreamingListenerBatchCompleted 
batchCompleted) {
+      GlobalWatermarkHolder.advance(jssc.sparkContext());
+    }
+  }
+}

Reply via email to