Make CreateStream a TimestampedValue Source.

fixup! a nicer DSL to construct CreateStream.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/62ddca63
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/62ddca63
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/62ddca63

Branch: refs/heads/master
Commit: 62ddca638ae76918f3a0a5fac4ca8fbdb1c7fb9a
Parents: 123f482
Author: Sela <ans...@paypal.com>
Authored: Mon Feb 27 23:35:10 2017 +0200
Committer: Sela <ans...@paypal.com>
Committed: Wed Mar 1 00:18:09 2017 +0200

----------------------------------------------------------------------
 .../beam/runners/spark/io/CreateStream.java     | 66 ++++++++++++-----
 .../streaming/StreamingTransformTranslator.java | 26 +++++--
 .../runners/spark/SparkPipelineStateTest.java   |  2 +-
 .../translation/streaming/CreateStreamTest.java | 76 ++++++++------------
 .../streaming/TrackStreamingSourcesTest.java    | 22 +++---
 5 files changed, 111 insertions(+), 81 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/62ddca63/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java
index 70784f1..f2e0bb3 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java
@@ -19,11 +19,16 @@ package org.apache.beam.runners.spark.io;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
+import com.google.common.collect.Lists;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.Deque;
 import java.util.LinkedList;
+import java.util.List;
 import java.util.Queue;
 import 
org.apache.beam.runners.spark.util.GlobalWatermarkHolder.SparkWatermarks;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
+import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowingStrategy;
@@ -83,20 +88,22 @@ import org.joda.time.Instant;
 public final class CreateStream<T> extends PTransform<PBegin, PCollection<T>> {
 
   private final Duration batchInterval;
-  private final Instant initialSystemTime;
-  private final Queue<Iterable<T>> batches = new LinkedList<>();
+  private final Queue<Iterable<TimestampedValue<T>>> batches = new 
LinkedList<>();
   private final Deque<SparkWatermarks> times = new LinkedList<>();
+  private final Coder<T> coder;
+  private Instant initialSystemTime;
 
   private Instant lowWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE; //for test 
purposes.
 
-  private CreateStream(Duration batchInterval, Instant initialSystemTime) {
+  private CreateStream(Duration batchInterval, Instant initialSystemTime, 
Coder<T> coder) {
     this.batchInterval = batchInterval;
     this.initialSystemTime = initialSystemTime;
+    this.coder = coder;
   }
 
   /** Set the batch interval for the stream. */
-  public static <T> CreateStream<T> withBatchInterval(Duration batchInterval) {
-    return new CreateStream<>(batchInterval, new Instant(0));
+  public static <T> CreateStream<T> of(Coder<T> coder, Duration batchInterval) 
{
+    return new CreateStream<>(batchInterval, new Instant(0), coder);
   }
 
   /**
@@ -104,25 +111,46 @@ public final class CreateStream<T> extends 
PTransform<PBegin, PCollection<T>> {
    * This is backed by a {@link Queue} so stream input order would keep the 
population order (FIFO).
    */
   @SafeVarargs
-  public final CreateStream<T> nextBatch(T... batchElements) {
+  public final CreateStream<T> nextBatch(TimestampedValue<T>... batchElements) 
{
     // validate timestamps if timestamped elements.
-    for (T element: batchElements) {
-      if (element instanceof TimestampedValue) {
-        TimestampedValue timestampedValue = (TimestampedValue) element;
-        checkArgument(
-            
timestampedValue.getTimestamp().isBefore(BoundedWindow.TIMESTAMP_MAX_VALUE),
-            "Elements must have timestamps before %s. Got: %s",
-            BoundedWindow.TIMESTAMP_MAX_VALUE,
-            timestampedValue.getTimestamp());
-      }
+    for (TimestampedValue<T> element: batchElements) {
+      TimestampedValue timestampedValue = (TimestampedValue) element;
+      checkArgument(
+          
timestampedValue.getTimestamp().isBefore(BoundedWindow.TIMESTAMP_MAX_VALUE),
+          "Elements must have timestamps before %s. Got: %s",
+          BoundedWindow.TIMESTAMP_MAX_VALUE,
+          timestampedValue.getTimestamp());
     }
     batches.offer(Arrays.asList(batchElements));
     return this;
   }
 
+  /**
+   * For non-timestamped elements.
+   */
+  @SafeVarargs
+  public final CreateStream<T> nextBatch(T... batchElements) {
+    List<TimestampedValue<T>> timestamped = 
Lists.newArrayListWithCapacity(batchElements.length);
+    // as TimestampedValue.
+    for (T element: batchElements) {
+      timestamped.add(TimestampedValue.atMinimumTimestamp(element));
+    }
+    batches.offer(timestamped);
+    return this;
+  }
+
+  /**
+   * Adds an empty batch.
+   */
+  public CreateStream<T> emptyBatch() {
+    batches.offer(Collections.<TimestampedValue<T>>emptyList());
+    return this;
+  }
+
   /** Set the initial synchronized processing time. */
   public CreateStream<T> initialSystemTimeAt(Instant initialSystemTime) {
-    return new CreateStream<>(batchInterval, initialSystemTime);
+    this.initialSystemTime = initialSystemTime;
+    return this;
   }
 
   /**
@@ -160,7 +188,7 @@ public final class CreateStream<T> extends 
PTransform<PBegin, PCollection<T>> {
   }
 
   /** Get the underlying queue representing the mock stream of micro-batches. 
*/
-  public Queue<Iterable<T>> getBatches() {
+  public Queue<Iterable<TimestampedValue<T>>> getBatches() {
     return batches;
   }
 
@@ -178,4 +206,8 @@ public final class CreateStream<T> extends 
PTransform<PBegin, PCollection<T>> {
         input.getPipeline(), WindowingStrategy.globalDefault(), 
PCollection.IsBounded.UNBOUNDED);
   }
 
+  @Override
+  protected Coder<T> getDefaultOutputCoder() throws 
CannotProvideCoderException {
+    return coder;
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/62ddca63/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
index fc98781..4a07741 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
@@ -31,6 +31,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Queue;
 import java.util.concurrent.LinkedBlockingQueue;
+import javax.annotation.Nonnull;
 import org.apache.beam.runners.spark.aggregators.NamedAggregators;
 import org.apache.beam.runners.spark.aggregators.SparkAggregators;
 import org.apache.beam.runners.spark.coders.CoderHelpers;
@@ -67,6 +68,8 @@ import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.CombineFnUtil;
@@ -75,6 +78,7 @@ import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TaggedPValue;
+import org.apache.beam.sdk.values.TimestampedValue;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.spark.Accumulator;
 import org.apache.spark.api.java.JavaPairRDD;
@@ -87,6 +91,7 @@ import org.apache.spark.streaming.api.java.JavaPairDStream;
 import org.apache.spark.streaming.api.java.JavaStreamingContext;
 
 
+
 /**
  * Supports translation between a Beam transform, and Spark's operations on 
DStreams.
  */
@@ -127,14 +132,25 @@ final class StreamingTransformTranslator {
       public void evaluate(CreateStream<T> transform, EvaluationContext 
context) {
         Coder<T> coder = context.getOutput(transform).getCoder();
         JavaStreamingContext jssc = context.getStreamingContext();
-        Queue<Iterable<T>> values = transform.getBatches();
-        WindowedValue.ValueOnlyWindowedValueCoder<T> windowCoder =
-            WindowedValue.getValueOnlyCoder(coder);
+        Queue<Iterable<TimestampedValue<T>>> values = transform.getBatches();
+        WindowedValue.FullWindowedValueCoder<T> windowCoder =
+            WindowedValue.FullWindowedValueCoder.of(coder, 
GlobalWindow.Coder.INSTANCE);
         // create the DStream from queue.
         Queue<JavaRDD<WindowedValue<T>>> rddQueue = new 
LinkedBlockingQueue<>();
-        for (Iterable<T> v : values) {
+        for (Iterable<TimestampedValue<T>> tv : values) {
           Iterable<WindowedValue<T>> windowedValues =
-              Iterables.transform(v, 
WindowingHelpers.<T>windowValueFunction());
+              Iterables.transform(
+                  tv,
+                  new com.google.common.base.Function<TimestampedValue<T>, 
WindowedValue<T>>() {
+                    @Override
+                    public WindowedValue<T> apply(@Nonnull TimestampedValue<T> 
timestampedValue) {
+                      return WindowedValue.of(
+                          timestampedValue.getValue(),
+                          timestampedValue.getTimestamp(),
+                          GlobalWindow.INSTANCE,
+                          PaneInfo.NO_FIRING);
+                    }
+              });
           JavaRDD<WindowedValue<T>> rdd =
               jssc.sparkContext()
                   .parallelize(CoderHelpers.toByteArrays(windowedValues, 
windowCoder))

http://git-wip-us.apache.org/repos/asf/beam/blob/62ddca63/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkPipelineStateTest.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkPipelineStateTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkPipelineStateTest.java
index c856203..37a201c 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkPipelineStateTest.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkPipelineStateTest.java
@@ -75,7 +75,7 @@ public class SparkPipelineStateTest implements Serializable {
 
   private PTransform<PBegin, PCollection<String>> getValues(final 
SparkPipelineOptions options) {
     return options.isStreaming()
-        ? 
CreateStream.<String>withBatchInterval(Duration.millis(1)).nextBatch("one", 
"two")
+        ? CreateStream.of(StringUtf8Coder.of(), 
Duration.millis(1)).nextBatch("one", "two")
         : Create.of("one", "two");
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/62ddca63/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java
index f2783a1..ff77535 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java
@@ -32,10 +32,8 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.Values;
@@ -83,9 +81,9 @@ public class CreateStreamTest implements Serializable {
   public void testLateDataAccumulating() throws IOException {
     Pipeline p = pipelineRule.createPipeline();
     Instant instant = new Instant(0);
-    CreateStream<TimestampedValue<Integer>> source =
-        
CreateStream.<TimestampedValue<Integer>>withBatchInterval(pipelineRule.batchDuration())
-            .nextBatch()
+    CreateStream<Integer> source =
+        CreateStream.of(VarIntCoder.of(), pipelineRule.batchDuration())
+            .emptyBatch()
             
.advanceWatermarkForNextBatch(instant.plus(Duration.standardMinutes(6)))
             .nextBatch(
                 TimestampedValue.of(1, instant),
@@ -104,8 +102,7 @@ public class CreateStreamTest implements Serializable {
                 TimestampedValue.of(-3, instant));
 
     PCollection<Integer> windowed = p
-        
.apply(source).setCoder(TimestampedValue.TimestampedValueCoder.of(VarIntCoder.of()))
-        .apply(ParDo.of(new OnlyValue<Integer>()))
+        .apply(source)
         
.apply(Window.<Integer>into(FixedWindows.of(Duration.standardMinutes(5))).triggering(
             AfterWatermark.pastEndOfWindow()
                 .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
@@ -156,8 +153,8 @@ public class CreateStreamTest implements Serializable {
   @Test
   public void testDiscardingMode() throws IOException {
     Pipeline p = pipelineRule.createPipeline();
-    CreateStream<TimestampedValue<String>> source =
-        
CreateStream.<TimestampedValue<String>>withBatchInterval(pipelineRule.batchDuration())
+    CreateStream<String> source =
+        CreateStream.of(StringUtf8Coder.of(), pipelineRule.batchDuration())
             .nextBatch(
                 TimestampedValue.of("firstPane", new Instant(100)),
                 TimestampedValue.of("alsoFirstPane", new Instant(200)))
@@ -172,8 +169,7 @@ public class CreateStreamTest implements Serializable {
     FixedWindows windowFn = FixedWindows.of(Duration.millis(1000L));
     Duration allowedLateness = Duration.millis(5000L);
     PCollection<String> values =
-        
p.apply(source).setCoder(TimestampedValue.TimestampedValueCoder.of(StringUtf8Coder.of()))
-            .apply(ParDo.of(new OnlyValue<String>()))
+        p.apply(source)
             .apply(
                 Window.<String>into(windowFn)
                     .triggering(
@@ -207,9 +203,9 @@ public class CreateStreamTest implements Serializable {
   public void testFirstElementLate() throws IOException {
     Pipeline p = pipelineRule.createPipeline();
     Instant lateElementTimestamp = new Instant(-1_000_000);
-    CreateStream<TimestampedValue<String>> source =
-        
CreateStream.<TimestampedValue<String>>withBatchInterval(pipelineRule.batchDuration())
-            .nextBatch()
+    CreateStream<String> source =
+        CreateStream.of(StringUtf8Coder.of(), pipelineRule.batchDuration())
+            .emptyBatch()
             .advanceWatermarkForNextBatch(new Instant(0))
             .nextBatch(
                 TimestampedValue.of("late", lateElementTimestamp),
@@ -219,8 +215,6 @@ public class CreateStreamTest implements Serializable {
     FixedWindows windowFn = FixedWindows.of(Duration.millis(1000L));
     Duration allowedLateness = Duration.millis(5000L);
     PCollection<String> values = p.apply(source)
-            
.setCoder(TimestampedValue.TimestampedValueCoder.of(StringUtf8Coder.of()))
-        .apply(ParDo.of(new OnlyValue<String>()))
         .apply(Window.<String>into(windowFn).triggering(DefaultTrigger.of())
             .discardingFiredPanes()
             .withAllowedLateness(allowedLateness))
@@ -243,8 +237,8 @@ public class CreateStreamTest implements Serializable {
   public void testElementsAtAlmostPositiveInfinity() throws IOException {
     Pipeline p = pipelineRule.createPipeline();
     Instant endOfGlobalWindow = GlobalWindow.INSTANCE.maxTimestamp();
-    CreateStream<TimestampedValue<String>> source =
-        
CreateStream.<TimestampedValue<String>>withBatchInterval(pipelineRule.batchDuration())
+    CreateStream<String> source =
+        CreateStream.of(StringUtf8Coder.of(), pipelineRule.batchDuration())
             .nextBatch(
                 TimestampedValue.of("foo", endOfGlobalWindow),
                 TimestampedValue.of("bar", endOfGlobalWindow))
@@ -252,8 +246,6 @@ public class CreateStreamTest implements Serializable {
 
     FixedWindows windows = FixedWindows.of(Duration.standardHours(6));
     PCollection<String> windowedValues = p.apply(source)
-            
.setCoder(TimestampedValue.TimestampedValueCoder.of(StringUtf8Coder.of()))
-        .apply(ParDo.of(new OnlyValue<String>()))
         .apply(Window.<String>into(windows))
         .apply(WithKeys.<Integer, String>of(1))
         .apply(GroupByKey.<Integer, String>create())
@@ -270,16 +262,16 @@ public class CreateStreamTest implements Serializable {
   public void testMultipleStreams() throws IOException {
     Pipeline p = pipelineRule.createPipeline();
     CreateStream<String> source =
-        CreateStream.<String>withBatchInterval(pipelineRule.batchDuration())
+        CreateStream.of(StringUtf8Coder.of(), pipelineRule.batchDuration())
             .nextBatch("foo", "bar")
             .advanceNextBatchWatermarkToInfinity();
     CreateStream<Integer> other =
-        CreateStream.<Integer>withBatchInterval(pipelineRule.batchDuration())
+        CreateStream.of(VarIntCoder.of(), pipelineRule.batchDuration())
             .nextBatch(1, 2, 3, 4)
             .advanceNextBatchWatermarkToInfinity();
 
     PCollection<String> createStrings =
-        p.apply("CreateStrings", source).setCoder(StringUtf8Coder.of())
+        p.apply("CreateStrings", source)
             .apply("WindowStrings",
                 Window.<String>triggering(AfterPane.elementCountAtLeast(2))
                     .withAllowedLateness(Duration.ZERO)
@@ -287,7 +279,7 @@ public class CreateStreamTest implements Serializable {
     PAssert.that(createStrings).containsInAnyOrder("foo", "bar");
 
     PCollection<Integer> createInts =
-        p.apply("CreateInts", other).setCoder(VarIntCoder.of())
+        p.apply("CreateInts", other)
             .apply("WindowInts",
                 Window.<Integer>triggering(AfterPane.elementCountAtLeast(4))
                     .withAllowedLateness(Duration.ZERO)
@@ -301,18 +293,18 @@ public class CreateStreamTest implements Serializable {
   public void testFlattenedWithWatermarkHold() throws IOException {
     Pipeline p = pipelineRule.createPipeline();
     Instant instant = new Instant(0);
-    CreateStream<TimestampedValue<Integer>> source1 =
-        
CreateStream.<TimestampedValue<Integer>>withBatchInterval(pipelineRule.batchDuration())
-            .nextBatch()
+    CreateStream<Integer> source1 =
+        CreateStream.of(VarIntCoder.of(), pipelineRule.batchDuration())
+            .emptyBatch()
             
.advanceWatermarkForNextBatch(instant.plus(Duration.standardMinutes(5)))
             .nextBatch(
                 TimestampedValue.of(1, instant),
                 TimestampedValue.of(2, instant),
                 TimestampedValue.of(3, instant))
             
.advanceWatermarkForNextBatch(instant.plus(Duration.standardMinutes(10)));
-    CreateStream<TimestampedValue<Integer>> source2 =
-        
CreateStream.<TimestampedValue<Integer>>withBatchInterval(pipelineRule.batchDuration())
-            .nextBatch()
+    CreateStream<Integer> source2 =
+        CreateStream.of(VarIntCoder.of(), pipelineRule.batchDuration())
+            .emptyBatch()
             
.advanceWatermarkForNextBatch(instant.plus(Duration.standardMinutes(1)))
             .nextBatch(
                 TimestampedValue.of(4, instant))
@@ -322,15 +314,13 @@ public class CreateStreamTest implements Serializable {
             
.advanceWatermarkForNextBatch(instant.plus(Duration.standardMinutes(5)));
 
     PCollection<Integer> windowed1 = p
-        
.apply(source1).setCoder(TimestampedValue.TimestampedValueCoder.of(VarIntCoder.of()))
-        .apply(ParDo.of(new OnlyValue<Integer>()))
+        .apply(source1)
         
.apply(Window.<Integer>into(FixedWindows.of(Duration.standardMinutes(5)))
             .triggering(AfterWatermark.pastEndOfWindow())
             .accumulatingFiredPanes()
             .withAllowedLateness(Duration.ZERO));
     PCollection<Integer> windowed2 = p
-        
.apply(source2).setCoder(TimestampedValue.TimestampedValueCoder.of(VarIntCoder.of()))
-        .apply(ParDo.of(new OnlyValue<Integer>()))
+        .apply(source2)
         
.apply(Window.<Integer>into(FixedWindows.of(Duration.standardMinutes(5)))
             .triggering(AfterWatermark.pastEndOfWindow())
             .accumulatingFiredPanes()
@@ -352,8 +342,8 @@ public class CreateStreamTest implements Serializable {
 
   @Test
   public void testElementAtPositiveInfinityThrows() {
-    CreateStream<TimestampedValue<Integer>> source =
-        
CreateStream.<TimestampedValue<Integer>>withBatchInterval(pipelineRule.batchDuration())
+    CreateStream<Integer> source =
+        CreateStream.of(VarIntCoder.of(), pipelineRule.batchDuration())
             .nextBatch(TimestampedValue.of(-1, 
BoundedWindow.TIMESTAMP_MAX_VALUE.minus(1L)));
     thrown.expect(IllegalArgumentException.class);
     source.nextBatch(TimestampedValue.of(1, 
BoundedWindow.TIMESTAMP_MAX_VALUE));
@@ -362,7 +352,7 @@ public class CreateStreamTest implements Serializable {
   @Test
   public void testAdvanceWatermarkNonMonotonicThrows() {
     CreateStream<Integer> source =
-        CreateStream.<Integer>withBatchInterval(pipelineRule.batchDuration())
+        CreateStream.of(VarIntCoder.of(), pipelineRule.batchDuration())
             .advanceWatermarkForNextBatch(new Instant(0L));
     thrown.expect(IllegalArgumentException.class);
     source.advanceWatermarkForNextBatch(new Instant(-1L));
@@ -371,19 +361,9 @@ public class CreateStreamTest implements Serializable {
   @Test
   public void testAdvanceWatermarkEqualToPositiveInfinityThrows() {
     CreateStream<Integer> source =
-        CreateStream.<Integer>withBatchInterval(pipelineRule.batchDuration())
+        CreateStream.of(VarIntCoder.of(), pipelineRule.batchDuration())
             
.advanceWatermarkForNextBatch(BoundedWindow.TIMESTAMP_MAX_VALUE.minus(1L));
     thrown.expect(IllegalArgumentException.class);
     source.advanceWatermarkForNextBatch(BoundedWindow.TIMESTAMP_MAX_VALUE);
   }
-
-  private static class OnlyValue<T> extends DoFn<TimestampedValue<T>, T> {
-
-    OnlyValue() { }
-
-    @ProcessElement
-    public void emitTimestampedValue(ProcessContext c) {
-      c.outputWithTimestamp(c.element().getValue(), 
c.element().getTimestamp());
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/62ddca63/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/TrackStreamingSourcesTest.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/TrackStreamingSourcesTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/TrackStreamingSourcesTest.java
index 32cef7e..b181a04 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/TrackStreamingSourcesTest.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/TrackStreamingSourcesTest.java
@@ -77,11 +77,11 @@ public class TrackStreamingSourcesTest {
     Pipeline p = Pipeline.create(options);
 
     CreateStream<Integer> emptyStream =
-        CreateStream.<Integer>withBatchInterval(
-            Duration.millis(options.getBatchIntervalMillis())).nextBatch();
+        CreateStream.of(
+            VarIntCoder.of(),
+            Duration.millis(options.getBatchIntervalMillis())).emptyBatch();
 
-    p.apply(emptyStream).setCoder(VarIntCoder.of())
-        .apply(ParDo.of(new PassthroughFn<>()));
+    p.apply(emptyStream).apply(ParDo.of(new PassthroughFn<>()));
 
     p.traverseTopologically(new StreamingSourceTracker(jssc, p, 
ParDo.Bound.class,  0));
     assertThat(StreamingSourceTracker.numAssertions, equalTo(1));
@@ -97,14 +97,16 @@ public class TrackStreamingSourcesTest {
     Pipeline p = Pipeline.create(options);
 
     CreateStream<Integer> queueStream1 =
-        CreateStream.<Integer>withBatchInterval(
-            Duration.millis(options.getBatchIntervalMillis())).nextBatch();
+        CreateStream.of(
+            VarIntCoder.of(),
+            Duration.millis(options.getBatchIntervalMillis())).emptyBatch();
     CreateStream<Integer> queueStream2 =
-        CreateStream.<Integer>withBatchInterval(
-            Duration.millis(options.getBatchIntervalMillis())).nextBatch();
+        CreateStream.of(
+            VarIntCoder.of(),
+            Duration.millis(options.getBatchIntervalMillis())).emptyBatch();
 
-    PCollection<Integer> pcol1 = 
p.apply(queueStream1).setCoder(VarIntCoder.of());
-    PCollection<Integer> pcol2 = 
p.apply(queueStream2).setCoder(VarIntCoder.of());
+    PCollection<Integer> pcol1 = p.apply(queueStream1);
+    PCollection<Integer> pcol2 = p.apply(queueStream2);
     PCollection<Integer> flattened =
         
PCollectionList.of(pcol1).and(pcol2).apply(Flatten.<Integer>pCollections());
     flattened.apply(ParDo.of(new PassthroughFn<>()));

Reply via email to