Watermark tests.

Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9784f204
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9784f204
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9784f204

Branch: refs/heads/master
Commit: 9784f204793ab8e8ec3ec84e3c7c8d2ca4ddaf6a
Parents: c18f8a2
Author: Sela <ans...@paypal.com>
Authored: Sun Feb 12 18:34:30 2017 +0200
Committer: Sela <ans...@paypal.com>
Committed: Mon Feb 20 11:30:15 2017 +0200

----------------------------------------------------------------------
 .../beam/runners/spark/ClearWatermarksRule.java |  37 ++++
 .../beam/runners/spark/ReuseSparkContext.java   |  46 ++++
 .../beam/runners/spark/WatermarkTest.java       | 212 +++++++++++++++++++
 3 files changed, 295 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/9784f204/runners/spark/src/test/java/org/apache/beam/runners/spark/ClearWatermarksRule.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/ClearWatermarksRule.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/ClearWatermarksRule.java
new file mode 100644
index 0000000..4c0c99a
--- /dev/null
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/ClearWatermarksRule.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.spark;
+
+import org.apache.beam.runners.spark.util.GlobalWatermarkHolder;
+import org.junit.rules.ExternalResource;
+
+/**
+ * A rule that clears the {@link GlobalWatermarkHolder}.
+ */
+public class ClearWatermarksRule extends ExternalResource {
+
+  @Override
+  protected void before() throws Throwable {
+    clearWatermarks();
+  }
+
+  public void clearWatermarks() {
+    GlobalWatermarkHolder.clear();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/9784f204/runners/spark/src/test/java/org/apache/beam/runners/spark/ReuseSparkContext.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/ReuseSparkContext.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/ReuseSparkContext.java
new file mode 100644
index 0000000..027f9fd
--- /dev/null
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/ReuseSparkContext.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark;
+
+import org.apache.beam.runners.spark.translation.SparkContextFactory;
+import org.junit.rules.ExternalResource;
+
+/**
+ * Explicitly set {@link org.apache.spark.SparkContext} to be reused (or not) 
in tests.
+ */
+public class ReuseSparkContext extends ExternalResource {
+
+  private final boolean reuse;
+
+  private ReuseSparkContext(boolean reuse) {
+    this.reuse = reuse;
+  }
+
+  public static ReuseSparkContext no() {
+    return new ReuseSparkContext(false);
+  }
+
+  public static ReuseSparkContext yes() {
+    return new ReuseSparkContext(true);
+  }
+
+  @Override
+  protected void before() throws Throwable {
+    System.setProperty(SparkContextFactory.TEST_REUSE_SPARK_CONTEXT, 
Boolean.toString(reuse));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/9784f204/runners/spark/src/test/java/org/apache/beam/runners/spark/WatermarkTest.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/WatermarkTest.java 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/WatermarkTest.java
new file mode 100644
index 0000000..0b56403
--- /dev/null
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/WatermarkTest.java
@@ -0,0 +1,212 @@
+package org.apache.beam.runners.spark;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.core.IsEqual.equalTo;
+import static org.junit.Assert.assertThat;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.runners.spark.io.CreateStream;
+import org.apache.beam.runners.spark.translation.SparkContextFactory;
+import org.apache.beam.runners.spark.util.GlobalWatermarkHolder;
+import 
org.apache.beam.runners.spark.util.GlobalWatermarkHolder.SparkWatermarks;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.RegexMatcher;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+
+/**
+ * A test suite for the propagation of watermarks in the Spark runner.
+ */
+public class WatermarkTest {
+
+  @Rule
+  public ClearWatermarksRule clearWatermarksRule = new ClearWatermarksRule();
+
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
+  @Rule
+  public ReuseSparkContext reuseContext = ReuseSparkContext.yes();
+
+  private static final SparkPipelineOptions options =
+      PipelineOptionsFactory.create().as(SparkPipelineOptions.class);
+
+  private static final String INSTANT_PATTERN =
+      "[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}.[0-9]{3}Z";
+
+  @Test
+  public void testLowHighWatermarksAdvance() {
+    JavaSparkContext jsc = SparkContextFactory.getSparkContext(options);
+
+    Instant instant = new Instant(0);
+    // low == high.
+    GlobalWatermarkHolder.add(1,
+        new SparkWatermarks(
+            instant.plus(Duration.millis(5)),
+            instant.plus(Duration.millis(5)),
+            instant));
+    GlobalWatermarkHolder.advance(jsc);
+    // low < high.
+    GlobalWatermarkHolder.add(1,
+        new SparkWatermarks(
+            instant.plus(Duration.millis(10)),
+            instant.plus(Duration.millis(15)),
+            instant.plus(Duration.millis(100))));
+    GlobalWatermarkHolder.advance(jsc);
+
+    // assert watermarks in Broadcast.
+    SparkWatermarks currentWatermarks = 
GlobalWatermarkHolder.get().getValue().get(1);
+    assertThat(currentWatermarks.getLowWatermark(), 
equalTo(instant.plus(Duration.millis(10))));
+    assertThat(currentWatermarks.getHighWatermark(), 
equalTo(instant.plus(Duration.millis(15))));
+    assertThat(currentWatermarks.getSynchronizedProcessingTime(),
+        equalTo(instant.plus(Duration.millis(100))));
+
+    // assert illegal watermark advance.
+    thrown.expect(IllegalStateException.class);
+    thrown.expectMessage(
+        RegexMatcher.matches(
+            "Low watermark " + INSTANT_PATTERN + " cannot be later then high 
watermark "
+            + INSTANT_PATTERN));
+    // low > high -> not allowed!
+    GlobalWatermarkHolder.add(1,
+        new SparkWatermarks(
+            instant.plus(Duration.millis(25)),
+            instant.plus(Duration.millis(20)),
+            instant.plus(Duration.millis(200))));
+    GlobalWatermarkHolder.advance(jsc);
+  }
+
+  @Test
+  public void testSynchronizedTimeMonotonic() {
+    JavaSparkContext jsc = SparkContextFactory.getSparkContext(options);
+
+    Instant instant = new Instant(0);
+    GlobalWatermarkHolder.add(1,
+        new SparkWatermarks(
+            instant.plus(Duration.millis(5)),
+            instant.plus(Duration.millis(10)),
+            instant));
+    GlobalWatermarkHolder.advance(jsc);
+
+    thrown.expect(IllegalStateException.class);
+    thrown.expectMessage("Synchronized processing time must advance.");
+    // no actual advancement of watermarks - fine by Watermarks
+    // but not by synchronized processing time.
+    GlobalWatermarkHolder.add(1,
+        new SparkWatermarks(
+            instant.plus(Duration.millis(5)),
+            instant.plus(Duration.millis(10)),
+            instant));
+    GlobalWatermarkHolder.advance(jsc);
+  }
+
+  @Test
+  public void testMultiSource() {
+    JavaSparkContext jsc = SparkContextFactory.getSparkContext(options);
+
+    Instant instant = new Instant(0);
+    GlobalWatermarkHolder.add(1,
+        new SparkWatermarks(
+            instant.plus(Duration.millis(5)),
+            instant.plus(Duration.millis(10)),
+            instant));
+    GlobalWatermarkHolder.add(2,
+        new SparkWatermarks(
+            instant.plus(Duration.millis(3)),
+            instant.plus(Duration.millis(6)),
+            instant));
+
+    GlobalWatermarkHolder.advance(jsc);
+
+    // assert watermarks for source 1.
+    SparkWatermarks watermarksForSource1 = 
GlobalWatermarkHolder.get().getValue().get(1);
+    assertThat(watermarksForSource1.getLowWatermark(), 
equalTo(instant.plus(Duration.millis(5))));
+    assertThat(watermarksForSource1.getHighWatermark(), 
equalTo(instant.plus(Duration.millis(10))));
+
+    // assert watermarks for source 2.
+    SparkWatermarks watermarksForSource2 = 
GlobalWatermarkHolder.get().getValue().get(2);
+    assertThat(watermarksForSource2.getLowWatermark(), 
equalTo(instant.plus(Duration.millis(3))));
+    assertThat(watermarksForSource2.getHighWatermark(), 
equalTo(instant.plus(Duration.millis(6))));
+  }
+
+  @Test
+  public void testInDoFn() {
+    // because watermark advances onBatchCompleted.
+    Iterable<Integer> zeroBatch = Collections.emptyList();
+    Iterable<Integer> firstBatch = Collections.singletonList(1);
+    Iterable<Integer> secondBatch = Collections.singletonList(2);
+
+    Instant instant = new Instant(0);
+    GlobalWatermarkHolder.add(1,
+        new SparkWatermarks(
+            instant.plus(Duration.millis(5)),
+            instant.plus(Duration.millis(10)),
+            instant));
+    GlobalWatermarkHolder.add(1,
+        new SparkWatermarks(
+            instant.plus(Duration.millis(10)),
+            instant.plus(Duration.millis(15)),
+            instant.plus(options.getBatchIntervalMillis())));
+
+    options.setRunner(SparkRunner.class);
+    options.setStreaming(true);
+    options.setBatchIntervalMillis(500L);
+    Pipeline p = Pipeline.create(options);
+
+    CreateStream.QueuedValues<Integer> queueStream =
+        CreateStream.fromQueue(Arrays.asList(zeroBatch, firstBatch, 
secondBatch));
+
+    p.apply(queueStream).setCoder(VarIntCoder.of()).apply(ParDo.of(new 
WatermarksDoFn(1)));
+
+    
p.run().waitUntilFinish(Duration.millis(options.getBatchIntervalMillis()).multipliedBy(3));
+
+    System.out.println(WatermarksDoFn.strings);
+
+    // this is a hacky way to assert but it will do until triggers are 
supported.
+    assertThat(
+        WatermarksDoFn.strings,
+        containsInAnyOrder(
+            "element: 1 lowWatermark: 5 highWatermark: 10 processingTime: 0",
+            "element: 2 lowWatermark: 10 highWatermark: 15 processingTime: 
1000"));
+  }
+
+  private static class WatermarksDoFn extends DoFn<Integer, String> {
+    private final int sourceId;
+
+    static List<String> strings = new ArrayList<>();
+
+    private WatermarksDoFn(int sourceId) {
+      this.sourceId = sourceId;
+    }
+
+    @ProcessElement
+    public void processElement(ProcessContext c) {
+      if (GlobalWatermarkHolder.get() == null
+          || GlobalWatermarkHolder.get().getValue().get(sourceId) == null) {
+        // watermark not yet updated.
+        return;
+      }
+      SparkWatermarks sparkWatermarks = 
GlobalWatermarkHolder.get().getValue().get(sourceId);
+      Integer element = c.element();
+      String output =
+          "element: " + element
+          + " lowWatermark: " + sparkWatermarks.getLowWatermark().getMillis()
+          + " highWatermark: " + sparkWatermarks.getHighWatermark().getMillis()
+          + " processingTime: " + 
sparkWatermarks.getSynchronizedProcessingTime().getMillis();
+      strings.add(output);
+    }
+  }
+
+}

Reply via email to