Add multi stream and flattened stream tests.

Misc. fixups.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/24ab6053
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/24ab6053
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/24ab6053

Branch: refs/heads/master
Commit: 24ab60538697b284b915157e08218de2e1e42f7b
Parents: da5f849
Author: Sela <ans...@paypal.com>
Authored: Mon Feb 20 00:14:39 2017 +0200
Committer: Sela <ans...@paypal.com>
Committed: Wed Mar 1 00:18:02 2017 +0200

----------------------------------------------------------------------
 .../beam/runners/spark/TestSparkRunner.java     |   8 +-
 .../beam/runners/spark/io/CreateStream.java     |  41 +++++
 .../translation/streaming/CreateStreamTest.java | 161 ++++++++++++-------
 3 files changed, 146 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/24ab6053/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
index 24bc038..985f75d 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
@@ -24,7 +24,8 @@ import static org.hamcrest.Matchers.is;
 import java.io.File;
 import java.io.IOException;
 import org.apache.beam.runners.core.UnboundedReadFromBoundedSource;
-import org.apache.beam.runners.spark.aggregators.AccumulatorSingleton;
+import org.apache.beam.runners.spark.aggregators.AggregatorsAccumulator;
+import org.apache.beam.runners.spark.metrics.SparkMetricsContainer;
 import org.apache.beam.runners.spark.util.GlobalWatermarkHolder;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.BoundedReadFromUnboundedSource;
@@ -118,8 +119,9 @@ public final class TestSparkRunner extends 
PipelineRunner<SparkPipelineResult> {
     long timeout = sparkOptions.getForcedTimeout();
     SparkPipelineResult result = null;
     try {
-      // clear state of Accumulators and Aggregators.
-      AccumulatorSingleton.clear();
+      // clear state of Aggregators, Metrics and Watermarks.
+      AggregatorsAccumulator.clear();
+      SparkMetricsContainer.clear();
       GlobalWatermarkHolder.clear();
 
       TestPipelineOptions testPipelineOptions = 
pipeline.getOptions().as(TestPipelineOptions.class);

http://git-wip-us.apache.org/repos/asf/beam/blob/24ab6053/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java
index 2149372..70784f1 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java
@@ -37,6 +37,47 @@ import org.joda.time.Instant;
 /**
  * Create an input stream from Queue. For SparkRunner tests only.
  *
+ * <p>To properly compose a stream of micro-batches with their Watermarks, 
please keep in mind
+ * that eventually there a two queues here - one for batches and another for 
Watermarks.
+ *
+ * <p>While both queues advance according to Spark's batch-interval, there is 
a slight difference
+ * in how data is pushed into the stream compared to the advancement of 
Watermarks since Watermarks
+ * advance onBatchCompleted hook call so if you'd want to set the watermark 
advance for a specific
+ * batch it should be called before that batch.
+ * Also keep in mind that being a queue that is polled per batch interval, if 
there is a need to
+ * "hold" the same Watermark without advancing it it should be stated 
explicitly or the Watermark
+ * will advance as soon as it can (in the next batch completed hook).
+ *
+ * <p>Example 1:
+ *
+ * {@code
+ * CreateStream.<TimestampedValue<String>>withBatchInterval(batchDuration)
+ *     .nextBatch(
+ *         TimestampedValue.of("foo", endOfGlobalWindow),
+ *         TimestampedValue.of("bar", endOfGlobalWindow))
+ *     .advanceNextBatchWatermarkToInfinity();
+ * }
+ * The first batch will see the default start-of-time WM of
+ * {@link BoundedWindow#TIMESTAMP_MIN_VALUE} and any following batch will see
+ * the end-of-time WM {@link BoundedWindow#TIMESTAMP_MAX_VALUE}.
+ *
+ * <p>Example 2:
+ *
+ * {@code
+ * CreateStream.<TimestampedValue<String>>withBatchInterval(batchDuration)
+ *     .nextBatch(
+ *         TimestampedValue.of(1, instant))
+ *     
.advanceWatermarkForNextBatch(instant.plus(Duration.standardMinutes(20)))
+ *     .nextBatch(
+ *         TimestampedValue.of(2, instant))
+ *     .nextBatch(
+ *         TimestampedValue.of(3, instant))
+ *     
.advanceWatermarkForNextBatch(instant.plus(Duration.standardMinutes(30)))
+ * }
+ * The first batch will see the start-of-time WM and the second will see the 
advanced (+20 min.) WM.
+ * The third WM will see the WM advanced to +30 min, because this is the next 
advancement of the WM
+ * regardless of where it ws called in the construction of CreateStream.
+ * //TODO: write a proper Builder enforcing all those rules mentioned.
  * @param <T> stream type.
  */
 public final class CreateStream<T> extends PTransform<PBegin, PCollection<T>> {

http://git-wip-us.apache.org/repos/asf/beam/blob/24ab6053/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java
index 0cb33ab..9ee5cc5 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java
@@ -52,6 +52,7 @@ import 
org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.Never;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
 import org.apache.beam.sdk.values.TimestampedValue;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
@@ -68,6 +69,9 @@ import org.junit.rules.TestName;
  * <p>Since Spark is a micro-batch engine, and will process any test-sized 
input
  * within the same (first) batch, it is important to make sure inputs are 
ingested across
  * micro-batches using {@link 
org.apache.spark.streaming.dstream.QueueInputDStream}.
+ * This test suite uses {@link CreateStream} to construct such
+ * {@link org.apache.spark.streaming.dstream.QueueInputDStream} and advance 
the system's WMs.
+ * //TODO: add synchronized/processing time trigger.
  */
 public class CreateStreamTest implements Serializable {
 
@@ -161,29 +165,6 @@ public class CreateStreamTest implements Serializable {
     p.run();
   }
 
-//  @Test
-//  @Category({NeedsRunner.class, UsesTestStream.class})
-//  public void testProcessingTimeTrigger() {
-//    TestStream<Long> source = TestStream.create(VarLongCoder.of())
-//            .addElements(TimestampedValue.of(1L, new Instant(1000L)),
-//                    TimestampedValue.of(2L, new Instant(2000L)))
-//            .advanceProcessingTime(Duration.standardMinutes(12))
-//            .addElements(TimestampedValue.of(3L, new Instant(3000L)))
-//            .advanceProcessingTime(Duration.standardMinutes(6))
-//            .advanceWatermarkToInfinity();
-//
-//    PCollection<Long> sum = p.apply(source)
-//            .apply(Window.<Long>triggering(AfterWatermark.pastEndOfWindow()
-//                    
.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
-//                            
.plusDelayOf(Duration.standardMinutes(5)))).accumulatingFiredPanes()
-//                    .withAllowedLateness(Duration.ZERO))
-//            .apply(Sum.longsGlobally());
-//
-//    PAssert.that(sum).inEarlyGlobalWindowPanes().containsInAnyOrder(3L, 6L);
-//
-//    p.run();
-//  }
-
   @Test
   public void testDiscardingMode() throws IOException {
     SparkPipelineOptions options = 
commonOptions.withTmpCheckpointDir(checkpointParentDir);
@@ -249,7 +230,7 @@ public class CreateStreamTest implements Serializable {
     CreateStream<TimestampedValue<String>> source =
         CreateStream.<TimestampedValue<String>>withBatchInterval(batchDuration)
             .nextBatch()
-            .advanceWatermarkForNextBatch(new Instant(-1_000_000))
+            .advanceWatermarkForNextBatch(new Instant(0))
             .nextBatch(
                 TimestampedValue.of("late", lateElementTimestamp),
                 TimestampedValue.of("onTime", new Instant(100)))
@@ -268,8 +249,9 @@ public class CreateStreamTest implements Serializable {
         .apply(Values.<Iterable<String>>create())
         .apply(Flatten.<String>iterables());
 
-    //TODO: empty panes do not emmit anything so Spark won't evaluate an 
"empty" assertion.
-//    
PAssert.that(values).inWindow(windowFn.assignWindow(lateElementTimestamp)).empty();
+    PAssert.that(values)
+        .inWindow(windowFn.assignWindow(lateElementTimestamp))
+        .empty();
     PAssert.that(values)
         .inWindow(windowFn.assignWindow(new Instant(100)))
         .containsInAnyOrder("onTime");
@@ -308,40 +290,97 @@ public class CreateStreamTest implements Serializable {
     p.run();
   }
 
-//  @Test
-//  public void testMultipleStreams() throws IOException {
-//    SparkPipelineOptions options = 
commonOptions.withTmpCheckpointDir(checkpointParentDir);
-//    Pipeline p = Pipeline.create(options);
-//    options.setJobName(testName.getMethodName());
-//    Duration batchDuration = 
Duration.millis(options.getBatchIntervalMillis());
-//
-//    CreateStream<String> source =
-//        CreateStream.<String>withBatchInterval(batchDuration)
-//            .nextBatch("foo", "bar").advanceWatermarkForNextBatch(new 
Instant(100))
-//            .nextBatch().advanceNextBatchWatermarkToInfinity();
-//
-////    CreateStream<Integer> other =
-////        CreateStream.<Integer>withBatchInterval(batchDuration)
-////            .nextBatch(1, 2, 3, 4)
-////            .advanceNextBatchWatermarkToInfinity();
-//
-//    PCollection<String> createStrings =
-//        p.apply("CreateStrings", source).setCoder(StringUtf8Coder.of())
-//            .apply("WindowStrings",
-//                Window.<String>triggering(AfterPane.elementCountAtLeast(2))
-//                    .withAllowedLateness(Duration.ZERO)
-//                    .accumulatingFiredPanes());
-//    PAssert.that(createStrings).containsInAnyOrder("foo", "bar");
-////    PCollection<Integer> createInts =
-////        p.apply("CreateInts", other).setCoder(VarIntCoder.of())
-////            .apply("WindowInts",
-////                
Window.<Integer>triggering(AfterPane.elementCountAtLeast(4))
-////                    .withAllowedLateness(Duration.ZERO)
-////                    .accumulatingFiredPanes());
-////    PAssert.that(createInts).containsInAnyOrder(1, 2, 3, 4);
-//
-//    p.run();
-//  }
+  @Test
+  public void testMultipleStreams() throws IOException {
+    SparkPipelineOptions options = 
commonOptions.withTmpCheckpointDir(checkpointParentDir);
+    Pipeline p = Pipeline.create(options);
+    options.setJobName(testName.getMethodName());
+    Duration batchDuration = Duration.millis(options.getBatchIntervalMillis());
+
+    CreateStream<String> source =
+        CreateStream.<String>withBatchInterval(batchDuration)
+            .nextBatch("foo", "bar")
+            .advanceNextBatchWatermarkToInfinity();
+    CreateStream<Integer> other =
+        CreateStream.<Integer>withBatchInterval(batchDuration)
+            .nextBatch(1, 2, 3, 4)
+            .advanceNextBatchWatermarkToInfinity();
+
+    PCollection<String> createStrings =
+        p.apply("CreateStrings", source).setCoder(StringUtf8Coder.of())
+            .apply("WindowStrings",
+                Window.<String>triggering(AfterPane.elementCountAtLeast(2))
+                    .withAllowedLateness(Duration.ZERO)
+                    .accumulatingFiredPanes());
+    PAssert.that(createStrings).containsInAnyOrder("foo", "bar");
+
+    PCollection<Integer> createInts =
+        p.apply("CreateInts", other).setCoder(VarIntCoder.of())
+            .apply("WindowInts",
+                Window.<Integer>triggering(AfterPane.elementCountAtLeast(4))
+                    .withAllowedLateness(Duration.ZERO)
+                    .accumulatingFiredPanes());
+    PAssert.that(createInts).containsInAnyOrder(1, 2, 3, 4);
+
+    p.run();
+  }
+
+  @Test
+  public void testFlattenedWithWatermarkHold() throws IOException {
+    SparkPipelineOptions options = 
commonOptions.withTmpCheckpointDir(checkpointParentDir);
+    Pipeline p = Pipeline.create(options);
+    options.setJobName(testName.getMethodName());
+    Duration batchDuration = Duration.millis(options.getBatchIntervalMillis());
+
+    Instant instant = new Instant(0);
+    CreateStream<TimestampedValue<Integer>> source1 =
+        
CreateStream.<TimestampedValue<Integer>>withBatchInterval(batchDuration)
+            .nextBatch()
+            
.advanceWatermarkForNextBatch(instant.plus(Duration.standardMinutes(5)))
+            .nextBatch(
+                TimestampedValue.of(1, instant),
+                TimestampedValue.of(2, instant),
+                TimestampedValue.of(3, instant))
+            
.advanceWatermarkForNextBatch(instant.plus(Duration.standardMinutes(10)));
+    CreateStream<TimestampedValue<Integer>> source2 =
+        
CreateStream.<TimestampedValue<Integer>>withBatchInterval(batchDuration)
+            .nextBatch()
+            
.advanceWatermarkForNextBatch(instant.plus(Duration.standardMinutes(1)))
+            .nextBatch(
+                TimestampedValue.of(4, instant))
+            
.advanceWatermarkForNextBatch(instant.plus(Duration.standardMinutes(2)))
+            .nextBatch(
+                TimestampedValue.of(5, instant))
+            
.advanceWatermarkForNextBatch(instant.plus(Duration.standardMinutes(5)));
+
+    PCollection<Integer> windowed1 = p
+        
.apply(source1).setCoder(TimestampedValue.TimestampedValueCoder.of(VarIntCoder.of()))
+        .apply(ParDo.of(new OnlyValue<Integer>()))
+        
.apply(Window.<Integer>into(FixedWindows.of(Duration.standardMinutes(5)))
+            .triggering(AfterWatermark.pastEndOfWindow())
+            .accumulatingFiredPanes()
+            .withAllowedLateness(Duration.ZERO));
+    PCollection<Integer> windowed2 = p
+        
.apply(source2).setCoder(TimestampedValue.TimestampedValueCoder.of(VarIntCoder.of()))
+        .apply(ParDo.of(new OnlyValue<Integer>()))
+        
.apply(Window.<Integer>into(FixedWindows.of(Duration.standardMinutes(5)))
+            .triggering(AfterWatermark.pastEndOfWindow())
+            .accumulatingFiredPanes()
+            .withAllowedLateness(Duration.ZERO));
+
+    PCollectionList<Integer> pCollectionList = 
PCollectionList.of(windowed1).and(windowed2);
+    PCollection<Integer> flattened = 
pCollectionList.apply(Flatten.<Integer>pCollections());
+    PCollection<Integer> triggered = flattened
+        .apply(WithKeys.<Integer, Integer>of(1))
+        .apply(GroupByKey.<Integer, Integer>create())
+        .apply(Values.<Iterable<Integer>>create())
+        .apply(Flatten.<Integer>iterables());
+
+    IntervalWindow window = new IntervalWindow(instant, 
instant.plus(Duration.standardMinutes(5L)));
+    PAssert.that(triggered).inOnTimePane(window).containsInAnyOrder(1, 2, 3, 
4, 5);
+
+    p.run();
+  }
 
   @Test
   public void testElementAtPositiveInfinityThrows() {
@@ -378,7 +417,7 @@ public class CreateStreamTest implements Serializable {
     OnlyValue() { }
 
     @ProcessElement
-    public void onlyValue(ProcessContext c) {
+    public void emitTimestampedValue(ProcessContext c) {
       c.outputWithTimestamp(c.element().getValue(), 
c.element().getTimestamp());
     }
   }

Reply via email to