Test triggers, panes and watermarks via CreateStream.

Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/96d373fe
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/96d373fe
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/96d373fe

Branch: refs/heads/master
Commit: 96d373fe954c263414e7f22ab08979f25cc83188
Parents: 9f14350
Author: Sela <ans...@paypal.com>
Authored: Sat Feb 18 22:05:19 2017 +0200
Committer: Sela <ans...@paypal.com>
Committed: Wed Mar 1 00:17:59 2017 +0200

----------------------------------------------------------------------
 .../beam/runners/spark/io/CreateStream.java     | 125 ++++--
 .../spark/GlobalWatermarkHolderTest.java        | 150 ++++++++
 .../beam/runners/spark/ReuseSparkContext.java   |  46 ---
 .../runners/spark/ReuseSparkContextRule.java    |  46 +++
 .../beam/runners/spark/WatermarkTest.java       | 231 -----------
 .../translation/streaming/CreateStreamTest.java | 385 +++++++++++++++++++
 .../ResumeFromCheckpointStreamingTest.java      | 296 ++++++++------
 7 files changed, 860 insertions(+), 419 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/96d373fe/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java
index 7ebba90..2149372 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java
@@ -17,57 +17,124 @@
  */
 package org.apache.beam.runners.spark.io;
 
-import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkArgument;
 
+import java.util.Arrays;
+import java.util.Deque;
+import java.util.LinkedList;
+import java.util.Queue;
+import 
org.apache.beam.runners.spark.util.GlobalWatermarkHolder.SparkWatermarks;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
 
 /**
- * Create an input stream from Queue.
+ * Create an input stream from Queue. For SparkRunner tests only.
  *
- * @param <T> stream type
+ * @param <T> stream type.
  */
-public final class CreateStream<T> {
+public final class CreateStream<T> extends PTransform<PBegin, PCollection<T>> {
+
+  private final Duration batchInterval;
+  private final Instant initialSystemTime;
+  private final Queue<Iterable<T>> batches = new LinkedList<>();
+  private final Deque<SparkWatermarks> times = new LinkedList<>();
+
+  private Instant lowWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE; //for test 
purposes.
 
-  private CreateStream() {
+  private CreateStream(Duration batchInterval, Instant initialSystemTime) {
+    this.batchInterval = batchInterval;
+    this.initialSystemTime = initialSystemTime;
+  }
+
+  /** Set the batch interval for the stream. */
+  public static <T> CreateStream<T> withBatchInterval(Duration batchInterval) {
+    return new CreateStream<>(batchInterval, new Instant(0));
   }
 
   /**
-   * Define the input stream to create from queue.
-   *
-   * @param queuedValues  defines the input stream
-   * @param <T>           stream type
-   * @return the queue that defines the input stream
+   * Enqueue next micro-batch elements.
+   * This is backed by a {@link Queue} so stream input order would keep the 
population order (FIFO).
    */
-  public static <T> QueuedValues<T> fromQueue(Iterable<Iterable<T>> 
queuedValues) {
-    return new QueuedValues<>(queuedValues);
+  @SafeVarargs
+  public final CreateStream<T> nextBatch(T... batchElements) {
+    // validate timestamps if timestamped elements.
+    for (T element: batchElements) {
+      if (element instanceof TimestampedValue) {
+        TimestampedValue timestampedValue = (TimestampedValue) element;
+        checkArgument(
+            
timestampedValue.getTimestamp().isBefore(BoundedWindow.TIMESTAMP_MAX_VALUE),
+            "Elements must have timestamps before %s. Got: %s",
+            BoundedWindow.TIMESTAMP_MAX_VALUE,
+            timestampedValue.getTimestamp());
+      }
+    }
+    batches.offer(Arrays.asList(batchElements));
+    return this;
+  }
+
+  /** Set the initial synchronized processing time. */
+  public CreateStream<T> initialSystemTimeAt(Instant initialSystemTime) {
+    return new CreateStream<>(batchInterval, initialSystemTime);
   }
 
   /**
-   * {@link PTransform} for queueing values.
+   * Advances the watermark in the next batch.
    */
-  public static final class QueuedValues<T> extends PTransform<PBegin, 
PCollection<T>> {
+  public CreateStream<T> advanceWatermarkForNextBatch(Instant newWatermark) {
+    checkArgument(
+        !newWatermark.isBefore(lowWatermark), "The watermark is not allowed to 
decrease!");
+    checkArgument(
+        newWatermark.isBefore(BoundedWindow.TIMESTAMP_MAX_VALUE),
+        "The Watermark cannot progress beyond the maximum. Got: %s. Maximum: 
%s",
+        newWatermark,
+        BoundedWindow.TIMESTAMP_MAX_VALUE);
+    return advance(newWatermark);
+  }
 
-    private final Iterable<Iterable<T>> queuedValues;
+  /**
+   * Advances the watermark in the next batch to the end-of-time.
+   */
+  public CreateStream<T> advanceNextBatchWatermarkToInfinity() {
+    return advance(BoundedWindow.TIMESTAMP_MAX_VALUE);
+  }
 
-    QueuedValues(Iterable<Iterable<T>> queuedValues) {
-      checkNotNull(
-          queuedValues, "need to set the queuedValues of an 
Create.QueuedValues transform");
-      this.queuedValues = queuedValues;
-    }
+  private CreateStream<T> advance(Instant newWatermark) {
+    // advance the system time.
+    Instant currentSynchronizedProcessingTime = times.peekLast() == null ? 
initialSystemTime
+        : times.peekLast().getSynchronizedProcessingTime();
+    Instant nextSynchronizedProcessingTime = 
currentSynchronizedProcessingTime.plus(batchInterval);
+    checkArgument(
+        
nextSynchronizedProcessingTime.isAfter(currentSynchronizedProcessingTime),
+        "Synchronized processing time must always advance.");
+    times.offer(new SparkWatermarks(lowWatermark, newWatermark, 
nextSynchronizedProcessingTime));
+    lowWatermark = newWatermark;
+    return this;
+  }
 
-    public Iterable<Iterable<T>> getQueuedValues() {
-      return queuedValues;
-    }
+  /** Get the underlying queue representing the mock stream of micro-batches. 
*/
+  public Queue<Iterable<T>> getBatches() {
+    return batches;
+  }
 
-    @Override
-    public PCollection<T> expand(PBegin input) {
-      // Spark streaming micro batches are bounded by default
-      return PCollection.createPrimitiveOutputInternal(input.getPipeline(),
-          WindowingStrategy.globalDefault(), PCollection.IsBounded.UNBOUNDED);
-    }
+  /**
+   * Get times so they can be pushed into the
+   * {@link org.apache.beam.runners.spark.util.GlobalWatermarkHolder}.
+   */
+  public Queue<SparkWatermarks> getTimes() {
+    return times;
+  }
+
+  @Override
+  public PCollection<T> expand(PBegin input) {
+    return PCollection.createPrimitiveOutputInternal(
+        input.getPipeline(), WindowingStrategy.globalDefault(), 
PCollection.IsBounded.UNBOUNDED);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/96d373fe/runners/spark/src/test/java/org/apache/beam/runners/spark/GlobalWatermarkHolderTest.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/GlobalWatermarkHolderTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/GlobalWatermarkHolderTest.java
new file mode 100644
index 0000000..c1d2944
--- /dev/null
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/GlobalWatermarkHolderTest.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark;
+
+import static org.hamcrest.core.IsEqual.equalTo;
+import static org.junit.Assert.assertThat;
+
+import org.apache.beam.runners.spark.translation.SparkContextFactory;
+import org.apache.beam.runners.spark.util.GlobalWatermarkHolder;
+import 
org.apache.beam.runners.spark.util.GlobalWatermarkHolder.SparkWatermarks;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.RegexMatcher;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+
+/**
+ * A test suite for the propagation of watermarks in the Spark runner.
+ */
+public class WatermarkTest {
+
+  @Rule
+  public ClearWatermarksRule clearWatermarksRule = new ClearWatermarksRule();
+
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
+  @Rule
+  public ReuseSparkContextRule reuseContext = ReuseSparkContextRule.yes();
+
+  private static final SparkPipelineOptions options =
+      PipelineOptionsFactory.create().as(SparkPipelineOptions.class);
+
+  private static final String INSTANT_PATTERN =
+      "[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}.[0-9]{3}Z";
+
+  @Test
+  public void testLowHighWatermarksAdvance() {
+    JavaSparkContext jsc = SparkContextFactory.getSparkContext(options);
+
+    Instant instant = new Instant(0);
+    // low == high.
+    GlobalWatermarkHolder.add(1,
+        new SparkWatermarks(
+            instant.plus(Duration.millis(5)),
+            instant.plus(Duration.millis(5)),
+            instant));
+    GlobalWatermarkHolder.advance(jsc);
+    // low < high.
+    GlobalWatermarkHolder.add(1,
+        new SparkWatermarks(
+            instant.plus(Duration.millis(10)),
+            instant.plus(Duration.millis(15)),
+            instant.plus(Duration.millis(100))));
+    GlobalWatermarkHolder.advance(jsc);
+
+    // assert watermarks in Broadcast.
+    SparkWatermarks currentWatermarks = 
GlobalWatermarkHolder.get().getValue().get(1);
+    assertThat(currentWatermarks.getLowWatermark(), 
equalTo(instant.plus(Duration.millis(10))));
+    assertThat(currentWatermarks.getHighWatermark(), 
equalTo(instant.plus(Duration.millis(15))));
+    assertThat(currentWatermarks.getSynchronizedProcessingTime(),
+        equalTo(instant.plus(Duration.millis(100))));
+
+    // assert illegal watermark advance.
+    thrown.expect(IllegalStateException.class);
+    thrown.expectMessage(
+        RegexMatcher.matches(
+            "Low watermark " + INSTANT_PATTERN + " cannot be later then high 
watermark "
+            + INSTANT_PATTERN));
+    // low > high -> not allowed!
+    GlobalWatermarkHolder.add(1,
+        new SparkWatermarks(
+            instant.plus(Duration.millis(25)),
+            instant.plus(Duration.millis(20)),
+            instant.plus(Duration.millis(200))));
+    GlobalWatermarkHolder.advance(jsc);
+  }
+
+  @Test
+  public void testSynchronizedTimeMonotonic() {
+    JavaSparkContext jsc = SparkContextFactory.getSparkContext(options);
+
+    Instant instant = new Instant(0);
+    GlobalWatermarkHolder.add(1,
+        new SparkWatermarks(
+            instant.plus(Duration.millis(5)),
+            instant.plus(Duration.millis(10)),
+            instant));
+    GlobalWatermarkHolder.advance(jsc);
+
+    thrown.expect(IllegalStateException.class);
+    thrown.expectMessage("Synchronized processing time must advance.");
+    // no actual advancement of watermarks - fine by Watermarks
+    // but not by synchronized processing time.
+    GlobalWatermarkHolder.add(1,
+        new SparkWatermarks(
+            instant.plus(Duration.millis(5)),
+            instant.plus(Duration.millis(10)),
+            instant));
+    GlobalWatermarkHolder.advance(jsc);
+  }
+
+  @Test
+  public void testMultiSource() {
+    JavaSparkContext jsc = SparkContextFactory.getSparkContext(options);
+
+    Instant instant = new Instant(0);
+    GlobalWatermarkHolder.add(1,
+        new SparkWatermarks(
+            instant.plus(Duration.millis(5)),
+            instant.plus(Duration.millis(10)),
+            instant));
+    GlobalWatermarkHolder.add(2,
+        new SparkWatermarks(
+            instant.plus(Duration.millis(3)),
+            instant.plus(Duration.millis(6)),
+            instant));
+
+    GlobalWatermarkHolder.advance(jsc);
+
+    // assert watermarks for source 1.
+    SparkWatermarks watermarksForSource1 = 
GlobalWatermarkHolder.get().getValue().get(1);
+    assertThat(watermarksForSource1.getLowWatermark(), 
equalTo(instant.plus(Duration.millis(5))));
+    assertThat(watermarksForSource1.getHighWatermark(), 
equalTo(instant.plus(Duration.millis(10))));
+
+    // assert watermarks for source 2.
+    SparkWatermarks watermarksForSource2 = 
GlobalWatermarkHolder.get().getValue().get(2);
+    assertThat(watermarksForSource2.getLowWatermark(), 
equalTo(instant.plus(Duration.millis(3))));
+    assertThat(watermarksForSource2.getHighWatermark(), 
equalTo(instant.plus(Duration.millis(6))));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/96d373fe/runners/spark/src/test/java/org/apache/beam/runners/spark/ReuseSparkContext.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/ReuseSparkContext.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/ReuseSparkContext.java
deleted file mode 100644
index 027f9fd..0000000
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/ReuseSparkContext.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.spark;
-
-import org.apache.beam.runners.spark.translation.SparkContextFactory;
-import org.junit.rules.ExternalResource;
-
-/**
- * Explicitly set {@link org.apache.spark.SparkContext} to be reused (or not) 
in tests.
- */
-public class ReuseSparkContext extends ExternalResource {
-
-  private final boolean reuse;
-
-  private ReuseSparkContext(boolean reuse) {
-    this.reuse = reuse;
-  }
-
-  public static ReuseSparkContext no() {
-    return new ReuseSparkContext(false);
-  }
-
-  public static ReuseSparkContext yes() {
-    return new ReuseSparkContext(true);
-  }
-
-  @Override
-  protected void before() throws Throwable {
-    System.setProperty(SparkContextFactory.TEST_REUSE_SPARK_CONTEXT, 
Boolean.toString(reuse));
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/96d373fe/runners/spark/src/test/java/org/apache/beam/runners/spark/ReuseSparkContextRule.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/ReuseSparkContextRule.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/ReuseSparkContextRule.java
new file mode 100644
index 0000000..027f9fd
--- /dev/null
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/ReuseSparkContextRule.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark;
+
+import org.apache.beam.runners.spark.translation.SparkContextFactory;
+import org.junit.rules.ExternalResource;
+
+/**
+ * Explicitly set {@link org.apache.spark.SparkContext} to be reused (or not) 
in tests.
+ */
+public class ReuseSparkContext extends ExternalResource {
+
+  private final boolean reuse;
+
+  private ReuseSparkContext(boolean reuse) {
+    this.reuse = reuse;
+  }
+
+  public static ReuseSparkContext no() {
+    return new ReuseSparkContext(false);
+  }
+
+  public static ReuseSparkContext yes() {
+    return new ReuseSparkContext(true);
+  }
+
+  @Override
+  protected void before() throws Throwable {
+    System.setProperty(SparkContextFactory.TEST_REUSE_SPARK_CONTEXT, 
Boolean.toString(reuse));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/96d373fe/runners/spark/src/test/java/org/apache/beam/runners/spark/WatermarkTest.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/WatermarkTest.java 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/WatermarkTest.java
deleted file mode 100644
index 0a0abf9..0000000
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/WatermarkTest.java
+++ /dev/null
@@ -1,231 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.spark;
-
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.hamcrest.core.IsEqual.equalTo;
-import static org.junit.Assert.assertThat;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import org.apache.beam.runners.spark.io.CreateStream;
-import org.apache.beam.runners.spark.translation.SparkContextFactory;
-import org.apache.beam.runners.spark.util.GlobalWatermarkHolder;
-import 
org.apache.beam.runners.spark.util.GlobalWatermarkHolder.SparkWatermarks;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.VarIntCoder;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.testing.RegexMatcher;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.junit.Ignore;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-
-
-/**
- * A test suite for the propagation of watermarks in the Spark runner.
- */
-public class WatermarkTest {
-
-  @Rule
-  public ClearWatermarksRule clearWatermarksRule = new ClearWatermarksRule();
-
-  @Rule
-  public ExpectedException thrown = ExpectedException.none();
-
-  @Rule
-  public ReuseSparkContext reuseContext = ReuseSparkContext.yes();
-
-  private static final SparkPipelineOptions options =
-      PipelineOptionsFactory.create().as(SparkPipelineOptions.class);
-
-  private static final String INSTANT_PATTERN =
-      "[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}.[0-9]{3}Z";
-
-  @Test
-  public void testLowHighWatermarksAdvance() {
-    JavaSparkContext jsc = SparkContextFactory.getSparkContext(options);
-
-    Instant instant = new Instant(0);
-    // low == high.
-    GlobalWatermarkHolder.add(1,
-        new SparkWatermarks(
-            instant.plus(Duration.millis(5)),
-            instant.plus(Duration.millis(5)),
-            instant));
-    GlobalWatermarkHolder.advance(jsc);
-    // low < high.
-    GlobalWatermarkHolder.add(1,
-        new SparkWatermarks(
-            instant.plus(Duration.millis(10)),
-            instant.plus(Duration.millis(15)),
-            instant.plus(Duration.millis(100))));
-    GlobalWatermarkHolder.advance(jsc);
-
-    // assert watermarks in Broadcast.
-    SparkWatermarks currentWatermarks = 
GlobalWatermarkHolder.get().getValue().get(1);
-    assertThat(currentWatermarks.getLowWatermark(), 
equalTo(instant.plus(Duration.millis(10))));
-    assertThat(currentWatermarks.getHighWatermark(), 
equalTo(instant.plus(Duration.millis(15))));
-    assertThat(currentWatermarks.getSynchronizedProcessingTime(),
-        equalTo(instant.plus(Duration.millis(100))));
-
-    // assert illegal watermark advance.
-    thrown.expect(IllegalStateException.class);
-    thrown.expectMessage(
-        RegexMatcher.matches(
-            "Low watermark " + INSTANT_PATTERN + " cannot be later then high 
watermark "
-            + INSTANT_PATTERN));
-    // low > high -> not allowed!
-    GlobalWatermarkHolder.add(1,
-        new SparkWatermarks(
-            instant.plus(Duration.millis(25)),
-            instant.plus(Duration.millis(20)),
-            instant.plus(Duration.millis(200))));
-    GlobalWatermarkHolder.advance(jsc);
-  }
-
-  @Test
-  public void testSynchronizedTimeMonotonic() {
-    JavaSparkContext jsc = SparkContextFactory.getSparkContext(options);
-
-    Instant instant = new Instant(0);
-    GlobalWatermarkHolder.add(1,
-        new SparkWatermarks(
-            instant.plus(Duration.millis(5)),
-            instant.plus(Duration.millis(10)),
-            instant));
-    GlobalWatermarkHolder.advance(jsc);
-
-    thrown.expect(IllegalStateException.class);
-    thrown.expectMessage("Synchronized processing time must advance.");
-    // no actual advancement of watermarks - fine by Watermarks
-    // but not by synchronized processing time.
-    GlobalWatermarkHolder.add(1,
-        new SparkWatermarks(
-            instant.plus(Duration.millis(5)),
-            instant.plus(Duration.millis(10)),
-            instant));
-    GlobalWatermarkHolder.advance(jsc);
-  }
-
-  @Test
-  public void testMultiSource() {
-    JavaSparkContext jsc = SparkContextFactory.getSparkContext(options);
-
-    Instant instant = new Instant(0);
-    GlobalWatermarkHolder.add(1,
-        new SparkWatermarks(
-            instant.plus(Duration.millis(5)),
-            instant.plus(Duration.millis(10)),
-            instant));
-    GlobalWatermarkHolder.add(2,
-        new SparkWatermarks(
-            instant.plus(Duration.millis(3)),
-            instant.plus(Duration.millis(6)),
-            instant));
-
-    GlobalWatermarkHolder.advance(jsc);
-
-    // assert watermarks for source 1.
-    SparkWatermarks watermarksForSource1 = 
GlobalWatermarkHolder.get().getValue().get(1);
-    assertThat(watermarksForSource1.getLowWatermark(), 
equalTo(instant.plus(Duration.millis(5))));
-    assertThat(watermarksForSource1.getHighWatermark(), 
equalTo(instant.plus(Duration.millis(10))));
-
-    // assert watermarks for source 2.
-    SparkWatermarks watermarksForSource2 = 
GlobalWatermarkHolder.get().getValue().get(2);
-    assertThat(watermarksForSource2.getLowWatermark(), 
equalTo(instant.plus(Duration.millis(3))));
-    assertThat(watermarksForSource2.getHighWatermark(), 
equalTo(instant.plus(Duration.millis(6))));
-  }
-
-  @Test
-  @Ignore(
-      "BEAM-1526 - This test is flaky, and is expected to be fixed in "
-          + "https://github.com/apache/beam/pull/2050";)
-  public void testInDoFn() {
-    // because watermark advances onBatchCompleted.
-    Iterable<Integer> zeroBatch = Collections.emptyList();
-    Iterable<Integer> firstBatch = Collections.singletonList(1);
-    Iterable<Integer> secondBatch = Collections.singletonList(2);
-
-    Instant instant = new Instant(0);
-    GlobalWatermarkHolder.add(1,
-        new SparkWatermarks(
-            instant.plus(Duration.millis(5)),
-            instant.plus(Duration.millis(10)),
-            instant));
-    GlobalWatermarkHolder.add(1,
-        new SparkWatermarks(
-            instant.plus(Duration.millis(10)),
-            instant.plus(Duration.millis(15)),
-            instant.plus(options.getBatchIntervalMillis())));
-
-    options.setRunner(SparkRunner.class);
-    options.setStreaming(true);
-    options.setBatchIntervalMillis(500L);
-    Pipeline p = Pipeline.create(options);
-
-    CreateStream.QueuedValues<Integer> queueStream =
-        CreateStream.fromQueue(Arrays.asList(zeroBatch, firstBatch, 
secondBatch));
-
-    p.apply(queueStream).setCoder(VarIntCoder.of()).apply(ParDo.of(new 
WatermarksDoFn(1)));
-
-    
p.run().waitUntilFinish(Duration.millis(options.getBatchIntervalMillis()).multipliedBy(3));
-
-    // this is a hacky way to assert but it will do until triggers are 
supported.
-    assertThat(
-        WatermarksDoFn.strings,
-        containsInAnyOrder(
-            "element: 1 lowWatermark: 5 highWatermark: 10 processingTime: 0",
-            "element: 2 lowWatermark: 10 highWatermark: 15 processingTime: 
1000"));
-  }
-
-  private static class WatermarksDoFn extends DoFn<Integer, String> {
-    private final int sourceId;
-
-    static List<String> strings = new ArrayList<>();
-
-    private WatermarksDoFn(int sourceId) {
-      this.sourceId = sourceId;
-    }
-
-    @ProcessElement
-    public void processElement(ProcessContext c) {
-      if (GlobalWatermarkHolder.get() == null
-          || GlobalWatermarkHolder.get().getValue().get(sourceId) == null) {
-        // watermark not yet updated.
-        return;
-      }
-      SparkWatermarks sparkWatermarks = 
GlobalWatermarkHolder.get().getValue().get(sourceId);
-      Integer element = c.element();
-      String output =
-          "element: " + element
-          + " lowWatermark: " + sparkWatermarks.getLowWatermark().getMillis()
-          + " highWatermark: " + sparkWatermarks.getHighWatermark().getMillis()
-          + " processingTime: " + 
sparkWatermarks.getSynchronizedProcessingTime().getMillis();
-      strings.add(output);
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/96d373fe/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java
new file mode 100644
index 0000000..0cb33ab
--- /dev/null
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java
@@ -0,0 +1,385 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.translation.streaming;
+
+import static org.hamcrest.Matchers.allOf;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
+import static org.junit.Assert.assertThat;
+
+import java.io.IOException;
+import java.io.Serializable;
+import org.apache.beam.runners.spark.ReuseSparkContextRule;
+import org.apache.beam.runners.spark.SparkPipelineOptions;
+import org.apache.beam.runners.spark.io.CreateStream;
+import 
org.apache.beam.runners.spark.translation.streaming.utils.SparkTestPipelineOptionsForStreaming;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.transforms.WithKeys;
+import org.apache.beam.sdk.transforms.windowing.AfterPane;
+import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
+import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.Never;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+
+
+/**
+ * A test suite to test Spark runner implementation of triggers and panes.
+ *
+ * <p>Since Spark is a micro-batch engine, and will process any test-sized 
input
+ * within the same (first) batch, it is important to make sure inputs are 
ingested across
+ * micro-batches using {@link 
org.apache.spark.streaming.dstream.QueueInputDStream}.
+ */
+public class CreateStreamTest implements Serializable {
+
+  @Rule
+  public transient TemporaryFolder checkpointParentDir = new TemporaryFolder();
+  @Rule
+  public transient SparkTestPipelineOptionsForStreaming commonOptions =
+      new SparkTestPipelineOptionsForStreaming();
+  @Rule
+  public transient ReuseSparkContextRule noContextResue = 
ReuseSparkContextRule.no();
+  @Rule
+  public transient TestName testName = new TestName();
+  @Rule
+  public transient ExpectedException thrown = ExpectedException.none();
+
+  @Test
+  public void testLateDataAccumulating() throws IOException {
+    SparkPipelineOptions options = 
commonOptions.withTmpCheckpointDir(checkpointParentDir);
+    Pipeline p = Pipeline.create(options);
+    options.setJobName(testName.getMethodName());
+    Duration batchDuration = Duration.millis(options.getBatchIntervalMillis());
+
+    Instant instant = new Instant(0);
+    CreateStream<TimestampedValue<Integer>> source =
+        
CreateStream.<TimestampedValue<Integer>>withBatchInterval(batchDuration)
+            .nextBatch()
+            
.advanceWatermarkForNextBatch(instant.plus(Duration.standardMinutes(6)))
+            .nextBatch(
+                TimestampedValue.of(1, instant),
+                TimestampedValue.of(2, instant),
+                TimestampedValue.of(3, instant))
+            
.advanceWatermarkForNextBatch(instant.plus(Duration.standardMinutes(20)))
+            // These elements are late but within the allowed lateness
+            .nextBatch(
+                TimestampedValue.of(4, instant),
+                TimestampedValue.of(5, instant))
+            // These elements are droppably late
+            .advanceNextBatchWatermarkToInfinity()
+            .nextBatch(
+                TimestampedValue.of(-1, instant),
+                TimestampedValue.of(-2, instant),
+                TimestampedValue.of(-3, instant));
+
+    PCollection<Integer> windowed = p
+        
.apply(source).setCoder(TimestampedValue.TimestampedValueCoder.of(VarIntCoder.of()))
+        .apply(ParDo.of(new OnlyValue<Integer>()))
+        
.apply(Window.<Integer>into(FixedWindows.of(Duration.standardMinutes(5))).triggering(
+            AfterWatermark.pastEndOfWindow()
+                .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
+                    .plusDelayOf(Duration.standardMinutes(2)))
+                .withLateFirings(AfterPane.elementCountAtLeast(1)))
+            .accumulatingFiredPanes()
+            .withAllowedLateness(Duration.standardMinutes(5), 
Window.ClosingBehavior.FIRE_ALWAYS));
+    PCollection<Integer> triggered = windowed.apply(WithKeys.<Integer, 
Integer>of(1))
+        .apply(GroupByKey.<Integer, Integer>create())
+        .apply(Values.<Iterable<Integer>>create())
+        .apply(Flatten.<Integer>iterables());
+    PCollection<Long> count = 
windowed.apply(Count.<Integer>globally().withoutDefaults());
+    PCollection<Integer> sum = 
windowed.apply(Sum.integersGlobally().withoutDefaults());
+
+    IntervalWindow window = new IntervalWindow(instant, 
instant.plus(Duration.standardMinutes(5L)));
+    PAssert.that(triggered)
+        .inFinalPane(window)
+        .containsInAnyOrder(1, 2, 3, 4, 5);
+    PAssert.that(triggered)
+        .inOnTimePane(window)
+        .containsInAnyOrder(1, 2, 3);
+    PAssert.that(count)
+        .inWindow(window)
+        .satisfies(new SerializableFunction<Iterable<Long>, Void>() {
+          @Override
+          public Void apply(Iterable<Long> input) {
+            for (Long count : input) {
+              assertThat(count, allOf(greaterThanOrEqualTo(3L), 
lessThanOrEqualTo(5L)));
+            }
+            return null;
+          }
+        });
+    PAssert.that(sum)
+        .inWindow(window)
+        .satisfies(new SerializableFunction<Iterable<Integer>, Void>() {
+          @Override
+          public Void apply(Iterable<Integer> input) {
+            for (Integer sum : input) {
+              assertThat(sum, allOf(greaterThanOrEqualTo(6), 
lessThanOrEqualTo(15)));
+            }
+            return null;
+          }
+        });
+
+    p.run();
+  }
+
+//  @Test
+//  @Category({NeedsRunner.class, UsesTestStream.class})
+//  public void testProcessingTimeTrigger() {
+//    TestStream<Long> source = TestStream.create(VarLongCoder.of())
+//            .addElements(TimestampedValue.of(1L, new Instant(1000L)),
+//                    TimestampedValue.of(2L, new Instant(2000L)))
+//            .advanceProcessingTime(Duration.standardMinutes(12))
+//            .addElements(TimestampedValue.of(3L, new Instant(3000L)))
+//            .advanceProcessingTime(Duration.standardMinutes(6))
+//            .advanceWatermarkToInfinity();
+//
+//    PCollection<Long> sum = p.apply(source)
+//            .apply(Window.<Long>triggering(AfterWatermark.pastEndOfWindow()
+//                    
.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
+//                            
.plusDelayOf(Duration.standardMinutes(5)))).accumulatingFiredPanes()
+//                    .withAllowedLateness(Duration.ZERO))
+//            .apply(Sum.longsGlobally());
+//
+//    PAssert.that(sum).inEarlyGlobalWindowPanes().containsInAnyOrder(3L, 6L);
+//
+//    p.run();
+//  }
+
+  @Test
+  public void testDiscardingMode() throws IOException {
+    SparkPipelineOptions options = 
commonOptions.withTmpCheckpointDir(checkpointParentDir);
+    Pipeline p = Pipeline.create(options);
+    options.setJobName(testName.getMethodName());
+    Duration batchDuration = Duration.millis(options.getBatchIntervalMillis());
+
+    CreateStream<TimestampedValue<String>> source =
+        CreateStream.<TimestampedValue<String>>withBatchInterval(batchDuration)
+            .nextBatch(
+                TimestampedValue.of("firstPane", new Instant(100)),
+                TimestampedValue.of("alsoFirstPane", new Instant(200)))
+            .advanceWatermarkForNextBatch(new Instant(1001L))
+            .nextBatch(
+                TimestampedValue.of("onTimePane", new Instant(500)))
+            .advanceNextBatchWatermarkToInfinity()
+            .nextBatch(
+                TimestampedValue.of("finalLatePane", new Instant(750)),
+                TimestampedValue.of("alsoFinalLatePane", new Instant(250)));
+
+    FixedWindows windowFn = FixedWindows.of(Duration.millis(1000L));
+    Duration allowedLateness = Duration.millis(5000L);
+    PCollection<String> values =
+        
p.apply(source).setCoder(TimestampedValue.TimestampedValueCoder.of(StringUtf8Coder.of()))
+            .apply(ParDo.of(new OnlyValue<String>()))
+            .apply(
+                Window.<String>into(windowFn)
+                    .triggering(
+                        AfterWatermark.pastEndOfWindow()
+                            .withEarlyFirings(AfterPane.elementCountAtLeast(2))
+                            .withLateFirings(Never.ever()))
+                    .discardingFiredPanes()
+                    .withAllowedLateness(allowedLateness))
+            .apply(WithKeys.<Integer, String>of(1))
+            .apply(GroupByKey.<Integer, String>create())
+            .apply(Values.<Iterable<String>>create())
+            .apply(Flatten.<String>iterables());
+
+    IntervalWindow window = windowFn.assignWindow(new Instant(100));
+    PAssert.that(values)
+        .inWindow(window)
+        .containsInAnyOrder(
+            "firstPane", "alsoFirstPane", "onTimePane", "finalLatePane", 
"alsoFinalLatePane");
+    PAssert.that(values)
+        .inCombinedNonLatePanes(window)
+        .containsInAnyOrder("firstPane", "alsoFirstPane", "onTimePane");
+    PAssert.that(values).inOnTimePane(window).containsInAnyOrder("onTimePane");
+    PAssert.that(values)
+        .inFinalPane(window)
+        .containsInAnyOrder("finalLatePane", "alsoFinalLatePane");
+
+    p.run();
+  }
+
+  @Test
+  public void testFirstElementLate() throws IOException {
+    SparkPipelineOptions options = 
commonOptions.withTmpCheckpointDir(checkpointParentDir);
+    Pipeline p = Pipeline.create(options);
+    options.setJobName(testName.getMethodName());
+    Duration batchDuration = Duration.millis(options.getBatchIntervalMillis());
+
+    Instant lateElementTimestamp = new Instant(-1_000_000);
+    CreateStream<TimestampedValue<String>> source =
+        CreateStream.<TimestampedValue<String>>withBatchInterval(batchDuration)
+            .nextBatch()
+            .advanceWatermarkForNextBatch(new Instant(-1_000_000))
+            .nextBatch(
+                TimestampedValue.of("late", lateElementTimestamp),
+                TimestampedValue.of("onTime", new Instant(100)))
+            .advanceNextBatchWatermarkToInfinity();
+
+    FixedWindows windowFn = FixedWindows.of(Duration.millis(1000L));
+    Duration allowedLateness = Duration.millis(5000L);
+    PCollection<String> values = p.apply(source)
+            
.setCoder(TimestampedValue.TimestampedValueCoder.of(StringUtf8Coder.of()))
+        .apply(ParDo.of(new OnlyValue<String>()))
+        .apply(Window.<String>into(windowFn).triggering(DefaultTrigger.of())
+            .discardingFiredPanes()
+            .withAllowedLateness(allowedLateness))
+        .apply(WithKeys.<Integer, String>of(1))
+        .apply(GroupByKey.<Integer, String>create())
+        .apply(Values.<Iterable<String>>create())
+        .apply(Flatten.<String>iterables());
+
+    //TODO: empty panes do not emmit anything so Spark won't evaluate an 
"empty" assertion.
+//    
PAssert.that(values).inWindow(windowFn.assignWindow(lateElementTimestamp)).empty();
+    PAssert.that(values)
+        .inWindow(windowFn.assignWindow(new Instant(100)))
+        .containsInAnyOrder("onTime");
+
+    p.run();
+  }
+
+  @Test
+  public void testElementsAtAlmostPositiveInfinity() throws IOException {
+    SparkPipelineOptions options = 
commonOptions.withTmpCheckpointDir(checkpointParentDir);
+    Pipeline p = Pipeline.create(options);
+    options.setJobName(testName.getMethodName());
+    Duration batchDuration = Duration.millis(options.getBatchIntervalMillis());
+
+    Instant endOfGlobalWindow = GlobalWindow.INSTANCE.maxTimestamp();
+    CreateStream<TimestampedValue<String>> source =
+        CreateStream.<TimestampedValue<String>>withBatchInterval(batchDuration)
+            .nextBatch(
+                TimestampedValue.of("foo", endOfGlobalWindow),
+                TimestampedValue.of("bar", endOfGlobalWindow))
+            .advanceNextBatchWatermarkToInfinity();
+
+    FixedWindows windows = FixedWindows.of(Duration.standardHours(6));
+    PCollection<String> windowedValues = p.apply(source)
+            
.setCoder(TimestampedValue.TimestampedValueCoder.of(StringUtf8Coder.of()))
+        .apply(ParDo.of(new OnlyValue<String>()))
+        .apply(Window.<String>into(windows))
+        .apply(WithKeys.<Integer, String>of(1))
+        .apply(GroupByKey.<Integer, String>create())
+        .apply(Values.<Iterable<String>>create())
+        .apply(Flatten.<String>iterables());
+
+    PAssert.that(windowedValues)
+        .inWindow(windows.assignWindow(GlobalWindow.INSTANCE.maxTimestamp()))
+        .containsInAnyOrder("foo", "bar");
+    p.run();
+  }
+
+//  @Test
+//  public void testMultipleStreams() throws IOException {
+//    SparkPipelineOptions options = 
commonOptions.withTmpCheckpointDir(checkpointParentDir);
+//    Pipeline p = Pipeline.create(options);
+//    options.setJobName(testName.getMethodName());
+//    Duration batchDuration = 
Duration.millis(options.getBatchIntervalMillis());
+//
+//    CreateStream<String> source =
+//        CreateStream.<String>withBatchInterval(batchDuration)
+//            .nextBatch("foo", "bar").advanceWatermarkForNextBatch(new 
Instant(100))
+//            .nextBatch().advanceNextBatchWatermarkToInfinity();
+//
+////    CreateStream<Integer> other =
+////        CreateStream.<Integer>withBatchInterval(batchDuration)
+////            .nextBatch(1, 2, 3, 4)
+////            .advanceNextBatchWatermarkToInfinity();
+//
+//    PCollection<String> createStrings =
+//        p.apply("CreateStrings", source).setCoder(StringUtf8Coder.of())
+//            .apply("WindowStrings",
+//                Window.<String>triggering(AfterPane.elementCountAtLeast(2))
+//                    .withAllowedLateness(Duration.ZERO)
+//                    .accumulatingFiredPanes());
+//    PAssert.that(createStrings).containsInAnyOrder("foo", "bar");
+////    PCollection<Integer> createInts =
+////        p.apply("CreateInts", other).setCoder(VarIntCoder.of())
+////            .apply("WindowInts",
+////                
Window.<Integer>triggering(AfterPane.elementCountAtLeast(4))
+////                    .withAllowedLateness(Duration.ZERO)
+////                    .accumulatingFiredPanes());
+////    PAssert.that(createInts).containsInAnyOrder(1, 2, 3, 4);
+//
+//    p.run();
+//  }
+
+  @Test
+  public void testElementAtPositiveInfinityThrows() {
+    Duration batchDuration = 
Duration.millis(commonOptions.getOptions().getBatchIntervalMillis());
+    CreateStream<TimestampedValue<Integer>> source =
+        
CreateStream.<TimestampedValue<Integer>>withBatchInterval(batchDuration)
+            .nextBatch(TimestampedValue.of(-1, 
BoundedWindow.TIMESTAMP_MAX_VALUE.minus(1L)));
+    thrown.expect(IllegalArgumentException.class);
+    source.nextBatch(TimestampedValue.of(1, 
BoundedWindow.TIMESTAMP_MAX_VALUE));
+  }
+
+  @Test
+  public void testAdvanceWatermarkNonMonotonicThrows() {
+    Duration batchDuration = 
Duration.millis(commonOptions.getOptions().getBatchIntervalMillis());
+    CreateStream<Integer> source =
+        CreateStream.<Integer>withBatchInterval(batchDuration)
+            .advanceWatermarkForNextBatch(new Instant(0L));
+    thrown.expect(IllegalArgumentException.class);
+    source.advanceWatermarkForNextBatch(new Instant(-1L));
+  }
+
+  @Test
+  public void testAdvanceWatermarkEqualToPositiveInfinityThrows() {
+    Duration batchDuration = 
Duration.millis(commonOptions.getOptions().getBatchIntervalMillis());
+    CreateStream<Integer> source =
+        CreateStream.<Integer>withBatchInterval(batchDuration)
+            
.advanceWatermarkForNextBatch(BoundedWindow.TIMESTAMP_MAX_VALUE.minus(1L));
+    thrown.expect(IllegalArgumentException.class);
+    source.advanceWatermarkForNextBatch(BoundedWindow.TIMESTAMP_MAX_VALUE);
+  }
+
+  private static class OnlyValue<T> extends DoFn<TimestampedValue<T>, T> {
+
+    OnlyValue() { }
+
+    @ProcessElement
+    public void onlyValue(ProcessContext c) {
+      c.outputWithTimestamp(c.element().getValue(), 
c.element().getTimestamp());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/96d373fe/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
index 62ee672..e307363 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
@@ -17,60 +17,70 @@
  */
 package org.apache.beam.runners.spark.translation.streaming;
 
-import static 
org.apache.beam.sdk.metrics.MetricMatchers.attemptedMetricsResult;
+import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.hasItem;
+import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertThat;
 
-import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.util.concurrent.Uninterruptibles;
 import java.io.IOException;
-import java.util.Arrays;
 import java.util.Collections;
-import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
+import org.apache.beam.runners.spark.ReuseSparkContextRule;
 import org.apache.beam.runners.spark.SparkPipelineOptions;
 import org.apache.beam.runners.spark.SparkPipelineResult;
-import org.apache.beam.runners.spark.aggregators.ClearAggregatorsRule;
+import org.apache.beam.runners.spark.SparkRunner;
+import org.apache.beam.runners.spark.aggregators.AccumulatorSingleton;
+import org.apache.beam.runners.spark.coders.CoderHelpers;
 import 
org.apache.beam.runners.spark.translation.streaming.utils.EmbeddedKafkaCluster;
-import 
org.apache.beam.runners.spark.translation.streaming.utils.PAssertStreaming;
-import 
org.apache.beam.runners.spark.translation.streaming.utils.SparkTestPipelineOptionsForStreaming;
+import org.apache.beam.runners.spark.util.GlobalWatermarkHolder;
 import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.InstantCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.kafka.KafkaIO;
-import org.apache.beam.sdk.metrics.Counter;
-import org.apache.beam.sdk.metrics.MetricNameFilter;
-import org.apache.beam.sdk.metrics.Metrics;
-import org.apache.beam.sdk.metrics.MetricsFilter;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.Keys;
+import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.Sum;
-import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.transforms.WithKeys;
+import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PDone;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.joda.time.Duration;
+import org.joda.time.Instant;
 import org.junit.AfterClass;
-import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
 
 
 /**
- * Test pipelines which are resumed from checkpoint.
+ * Tests DStream recovery from checkpoint.
+ *
+ * <p>Runs the pipeline reading from a Kafka backlog with a WM function that 
will move to infinity
+ * on a EOF signal.
+ * After resuming from checkpoint, a single output (guaranteed by the WM) is 
asserted, along with
+ * {@link Aggregator}s values that are expected to resume from previous count 
as well.
  */
 public class ResumeFromCheckpointStreamingTest {
   private static final EmbeddedKafkaCluster.EmbeddedZookeeper 
EMBEDDED_ZOOKEEPER =
@@ -78,142 +88,158 @@ public class ResumeFromCheckpointStreamingTest {
   private static final EmbeddedKafkaCluster EMBEDDED_KAFKA_CLUSTER =
       new EmbeddedKafkaCluster(EMBEDDED_ZOOKEEPER.getConnection(), new 
Properties());
   private static final String TOPIC = "kafka_beam_test_topic";
-  private static final Map<String, String> KAFKA_MESSAGES = ImmutableMap.of(
-      "k1", "v1", "k2", "v2", "k3", "v3", "k4", "v4"
-  );
-  private static final String[] EXPECTED = {"k1,v1", "k2,v2", "k3,v3", 
"k4,v4"};
-  private static final long EXPECTED_AGG_FIRST = 4L;
-  private static final long EXPECTED_COUNTER_FIRST = 4L;
-  private static final long EXPECTED_AGG_SECOND = 8L;
-  private static final long EXPECTED_COUNTER_SECOND = 8L;
 
   @Rule
-  public TemporaryFolder checkpointParentDir = new TemporaryFolder();
-
+  public TemporaryFolder tmpFolder = new TemporaryFolder();
   @Rule
-  public SparkTestPipelineOptionsForStreaming commonOptions =
-      new SparkTestPipelineOptionsForStreaming();
-
+  public ReuseSparkContextRule noContextResue = ReuseSparkContextRule.no();
   @Rule
-  public ClearAggregatorsRule clearAggregatorsRule = new 
ClearAggregatorsRule();
+  public transient TestName testName = new TestName();
 
   @BeforeClass
   public static void init() throws IOException {
     EMBEDDED_ZOOKEEPER.startup();
     EMBEDDED_KAFKA_CLUSTER.startup();
-    /// this test actually requires to NOT reuse the context but rather to 
stop it and start again
-    // from the checkpoint with a brand new context.
-    System.setProperty("beam.spark.test.reuseSparkContext", "false");
   }
 
-  private static void produce() {
+  private static void produce(Map<String, Instant> messages) {
     Properties producerProps = new Properties();
     producerProps.putAll(EMBEDDED_KAFKA_CLUSTER.getProps());
     producerProps.put("request.required.acks", 1);
     producerProps.put("bootstrap.servers", 
EMBEDDED_KAFKA_CLUSTER.getBrokerList());
     Serializer<String> stringSerializer = new StringSerializer();
-    try (@SuppressWarnings("unchecked") KafkaProducer<String, String> 
kafkaProducer =
-        new KafkaProducer(producerProps, stringSerializer, stringSerializer)) {
-          for (Map.Entry<String, String> en : KAFKA_MESSAGES.entrySet()) {
+    Serializer<Instant> instantSerializer = new Serializer<Instant>() {
+      @Override
+      public void configure(Map<String, ?> configs, boolean isKey) { }
+
+      @Override
+      public byte[] serialize(String topic, Instant data) {
+        return CoderHelpers.toByteArray(data, InstantCoder.of());
+      }
+
+      @Override
+      public void close() { }
+    };
+
+    try (@SuppressWarnings("unchecked") KafkaProducer<String, Instant> 
kafkaProducer =
+        new KafkaProducer(producerProps, stringSerializer, instantSerializer)) 
{
+          for (Map.Entry<String, Instant> en : messages.entrySet()) {
             kafkaProducer.send(new ProducerRecord<>(TOPIC, en.getKey(), 
en.getValue()));
           }
           kafkaProducer.close();
         }
   }
 
-  /**
-   * Tests DStream recovery from checkpoint - recreate the job and continue 
(from checkpoint).
-   * <p>Also tests Aggregator values, which should be restored upon recovery 
from checkpoint.</p>
-   */
   @Test
-  public void testRun() throws Exception {
-    Duration batchIntervalDuration = Duration.standardSeconds(5);
-    SparkPipelineOptions options = 
commonOptions.withTmpCheckpointDir(checkpointParentDir);
-    // provide a generous enough batch-interval to have everything fit in one 
micro-batch.
-    options.setBatchIntervalMillis(batchIntervalDuration.getMillis());
-    // provide a very generous read time bound, we rely on num records bound 
here.
-    options.setMinReadTimeMillis(batchIntervalDuration.minus(1).getMillis());
-    // bound the read on the number of messages - 1 topic of 4 messages.
-    options.setMaxRecordsPerBatch(4L);
-
-    // checkpoint after first (and only) interval.
-    options.setCheckpointDurationMillis(options.getBatchIntervalMillis());
-
-    MetricsFilter metricsFilter =
-        MetricsFilter.builder()
-            
.addNameFilter(MetricNameFilter.inNamespace(ResumeFromCheckpointStreamingTest.class))
-            .build();
+  public void testWithResume() throws Exception {
+    SparkPipelineOptions options = 
PipelineOptionsFactory.create().as(SparkPipelineOptions.class);
+    options.setRunner(SparkRunner.class);
+    options.setCheckpointDir(tmpFolder.newFolder().toString());
+    options.setCheckpointDurationMillis(500L);
+    options.setJobName(testName.getMethodName());
+    options.setSparkMaster("local[*]");
+
+    // write to Kafka
+    produce(ImmutableMap.of(
+        "k1", new Instant(100),
+        "k2", new Instant(200),
+        "k3", new Instant(300),
+        "k4", new Instant(400)
+    ));
 
     // first run will read from Kafka backlog - "auto.offset.reset=smallest"
     SparkPipelineResult res = run(options);
+    res.waitUntilFinish(Duration.standardSeconds(2));
+    // assertions 1:
     long processedMessages1 = res.getAggregatorValue("processedMessages", 
Long.class);
-    assertThat(String.format("Expected %d processed messages count but "
-        + "found %d", EXPECTED_AGG_FIRST, processedMessages1), 
processedMessages1,
-            equalTo(EXPECTED_AGG_FIRST));
-    assertThat(res.metrics().queryMetrics(metricsFilter).counters(),
-        
hasItem(attemptedMetricsResult(ResumeFromCheckpointStreamingTest.class.getName(),
-            "aCounter", "formatKV", EXPECTED_COUNTER_FIRST)));
+    assertThat(
+        String.format(
+            "Expected %d processed messages count but found %d", 4, 
processedMessages1),
+        processedMessages1,
+        equalTo(4L));
+
+    //--- between executions:
+
+    //- clear state.
+    AccumulatorSingleton.clear();
+    GlobalWatermarkHolder.clear();
+
+    //- write a bit more.
+    produce(ImmutableMap.of(
+        "k5", new Instant(499),
+        "EOF", new Instant(500) // to be dropped from [0, 500).
+    ));
 
     // recovery should resume from last read offset, and read the second batch 
of input.
     res = runAgain(options);
+    res.waitUntilFinish(Duration.standardSeconds(2));
+    // assertions 2:
     long processedMessages2 = res.getAggregatorValue("processedMessages", 
Long.class);
-    assertThat(String.format("Expected %d processed messages count but "
-        + "found %d", EXPECTED_AGG_SECOND, processedMessages2), 
processedMessages2,
-            equalTo(EXPECTED_AGG_SECOND));
-    assertThat(res.metrics().queryMetrics(metricsFilter).counters(),
-        
hasItem(attemptedMetricsResult(ResumeFromCheckpointStreamingTest.class.getName(),
-            "aCounter", "formatKV", EXPECTED_COUNTER_SECOND)));
+    int successAssertions = res.getAggregatorValue(PAssert.SUCCESS_COUNTER, 
Integer.class);
+    assertThat(
+        String.format("Expected %d processed messages count but found %d", 1, 
processedMessages2),
+        processedMessages2,
+        equalTo(5L));
+    res.getAggregatorValue(PAssert.SUCCESS_COUNTER, Integer.class);
+    assertThat(
+        String.format(
+            "Expected %d successful assertions, but found %d.", 1, 
successAssertions),
+            successAssertions,
+            is(1));
+    // validate assertion didn't fail.
+    int failedAssertions = res.getAggregatorValue(PAssert.FAILURE_COUNTER, 
Integer.class);
+    assertThat(
+        String.format("Found %d failed assertions.", failedAssertions),
+        failedAssertions,
+        is(0));
+
   }
 
   private SparkPipelineResult runAgain(SparkPipelineOptions options) {
-    clearAggregatorsRule.clearNamedAggregators();
     // sleep before next run.
-    Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+    Uninterruptibles.sleepUninterruptibly(10, TimeUnit.MILLISECONDS);
     return run(options);
   }
 
   private static SparkPipelineResult run(SparkPipelineOptions options) {
-    // write to Kafka
-    produce();
-    Map<String, Object> consumerProps = ImmutableMap.<String, Object>of(
-        "auto.offset.reset", "earliest"
-    );
-
-    KafkaIO.Read<String, String> read = KafkaIO.<String, String>read()
+    KafkaIO.Read<String, Instant> read = KafkaIO.<String, Instant>read()
         .withBootstrapServers(EMBEDDED_KAFKA_CLUSTER.getBrokerList())
         .withTopics(Collections.singletonList(TOPIC))
         .withKeyCoder(StringUtf8Coder.of())
-        .withValueCoder(StringUtf8Coder.of())
-        .updateConsumerProperties(consumerProps);
-
-    Duration windowDuration = new Duration(options.getBatchIntervalMillis());
+        .withValueCoder(InstantCoder.of())
+        .updateConsumerProperties(ImmutableMap.<String, 
Object>of("auto.offset.reset", "earliest"))
+        .withTimestampFn(new SerializableFunction<KV<String, Instant>, 
Instant>() {
+          @Override
+          public Instant apply(KV<String, Instant> kv) {
+            return kv.getValue();
+          }
+        }).withWatermarkFn(new SerializableFunction<KV<String, Instant>, 
Instant>() {
+          @Override
+          public Instant apply(KV<String, Instant> kv) {
+            // at EOF move WM to infinity.
+            String key = kv.getKey();
+            Instant instant = kv.getValue();
+            return key.equals("EOF") ? BoundedWindow.TIMESTAMP_MAX_VALUE : 
instant;
+          }
+        });
 
     Pipeline p = Pipeline.create(options);
 
-    PCollection<String> expectedCol =
-        
p.apply(Create.of(ImmutableList.copyOf(EXPECTED)).withCoder(StringUtf8Coder.of()));
-    final PCollectionView<List<String>> expectedView = 
expectedCol.apply(View.<String>asList());
-
-    PCollection<String> formattedKV =
-        p.apply(read.withoutMetadata())
-            .apply("formatKV", ParDo.of(new DoFn<KV<String, String>, 
KV<String, String>>() {
-              Counter counter =
-                  Metrics.counter(ResumeFromCheckpointStreamingTest.class, 
"aCounter");
-
-              @ProcessElement
-              public void process(ProcessContext c) {
-                // Check side input is passed correctly also after resuming 
from checkpoint
-                Assert.assertEquals(c.sideInput(expectedView), 
Arrays.asList(EXPECTED));
-                counter.inc();
-                c.output(c.element());
-              }
-            }).withSideInputs(expectedView))
-            .apply(Window.<KV<String, 
String>>into(FixedWindows.of(windowDuration)))
-            .apply(ParDo.of(new FormatAsText()));
-
-    // graceful shutdown will make sure first batch (at least) will finish.
-    Duration timeout = Duration.standardSeconds(1L);
-    return PAssertStreaming.runAndAssertContents(p, formattedKV, EXPECTED, 
timeout);
+    PCollection<Iterable<String>> grouped = p
+        .apply(read.withoutMetadata())
+        .apply(Keys.<String>create())
+        .apply(ParDo.of(new EOFShallNotPassFn()))
+        .apply(Window.<String>into(FixedWindows.of(Duration.millis(500)))
+            .triggering(AfterWatermark.pastEndOfWindow())
+                .accumulatingFiredPanes()
+                .withAllowedLateness(Duration.ZERO))
+        .apply(WithKeys.<Integer, String>of(1))
+        .apply(GroupByKey.<Integer, String>create())
+        .apply(Values.<Iterable<String>>create());
+
+    grouped.apply(new PAssertWithoutFlatten<>("k1", "k2", "k3", "k4", "k5"));
+
+    return (SparkPipelineResult) p.run();
   }
 
   @AfterClass
@@ -222,16 +248,60 @@ public class ResumeFromCheckpointStreamingTest {
     EMBEDDED_ZOOKEEPER.shutdown();
   }
 
-  private static class FormatAsText extends DoFn<KV<String, String>, String> {
-
+  /** A pass-through fn that prevents EOF event from passing. */
+  private static class EOFShallNotPassFn extends DoFn<String, String> {
     private final Aggregator<Long, Long> aggregator =
         createAggregator("processedMessages", Sum.ofLongs());
 
     @ProcessElement
     public void process(ProcessContext c) {
-      aggregator.addValue(1L);
-      String formatted = c.element().getKey() + "," + c.element().getValue();
-      c.output(formatted);
+      String element = c.element();
+      if (!element.equals("EOF")) {
+        aggregator.addValue(1L);
+        c.output(c.element());
+      }
+    }
+  }
+
+  /**
+   * A custom PAssert that avoids using {@link 
org.apache.beam.sdk.transforms.Flatten}
+   * until BEAM-1444 is resolved.
+   */
+  private static class PAssertWithoutFlatten<T>
+      extends PTransform<PCollection<Iterable<T>>, PDone> {
+    private final T[] expected;
+
+    private PAssertWithoutFlatten(T... expected) {
+      this.expected = expected;
+    }
+
+    @Override
+    public PDone expand(PCollection<Iterable<T>> input) {
+      input.apply(ParDo.of(new AssertDoFn<>(expected)));
+      return PDone.in(input.getPipeline());
+    }
+
+    private static class AssertDoFn<T> extends DoFn<Iterable<T>, Void> {
+      private final Aggregator<Integer, Integer> success =
+          createAggregator(PAssert.SUCCESS_COUNTER, Sum.ofIntegers());
+      private final Aggregator<Integer, Integer> failure =
+          createAggregator(PAssert.FAILURE_COUNTER, Sum.ofIntegers());
+      private final T[] expected;
+
+      AssertDoFn(T[] expected) {
+        this.expected = expected;
+      }
+
+      @ProcessElement
+      public void processElement(ProcessContext c) throws Exception {
+        try {
+          assertThat(c.element(), containsInAnyOrder(expected));
+          success.addValue(1);
+        } catch (Throwable t) {
+          failure.addValue(1);
+          throw t;
+        }
+      }
     }
   }
 

Reply via email to