Expose internal constructors for TestStream events

These are needed for deserialization from proto.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/47cea784
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/47cea784
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/47cea784

Branch: refs/heads/master
Commit: 47cea78496a9a464d8cea7943a2f741c03692612
Parents: f5e30c5
Author: Kenneth Knowles <k...@google.com>
Authored: Thu Jun 1 19:17:58 2017 -0700
Committer: Kenneth Knowles <k...@google.com>
Committed: Fri Jun 2 10:06:52 2017 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/testing/TestStream.java | 60 ++++++++++++++------
 1 file changed, 44 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/47cea784/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java
index d41b9ef..9ad8fd8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java
@@ -24,8 +24,10 @@ import static 
com.google.common.base.Preconditions.checkNotNull;
 import com.google.auto.value.AutoValue;
 import com.google.common.collect.ImmutableList;
 import java.util.List;
+import java.util.Objects;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineRunner;
+import org.apache.beam.sdk.annotations.Internal;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -86,8 +88,8 @@ public final class TestStream<T> extends PTransform<PBegin, 
PCollection<T>> {
     /**
      * Adds the specified elements to the source with timestamp equal to the 
current watermark.
      *
-     * @return A {@link TestStream.Builder} like this one that will add the 
provided elements
-     *         after all earlier events have completed.
+     * @return A {@link TestStream.Builder} like this one that will add the 
provided elements after
+     *     all earlier events have completed.
      */
     @SafeVarargs
     public final Builder<T> addElements(T element, T... elements) {
@@ -103,8 +105,8 @@ public final class TestStream<T> extends PTransform<PBegin, 
PCollection<T>> {
     /**
      * Adds the specified elements to the source with the provided timestamps.
      *
-     * @return A {@link TestStream.Builder} like this one that will add the 
provided elements
-     *         after all earlier events have completed.
+     * @return A {@link TestStream.Builder} like this one that will add the 
provided elements after
+     *     all earlier events have completed.
      */
     @SafeVarargs
     public final Builder<T> addElements(
@@ -136,7 +138,7 @@ public final class TestStream<T> extends PTransform<PBegin, 
PCollection<T>> {
      * BoundedWindow#TIMESTAMP_MAX_VALUE} or beyond.
      *
      * @return A {@link TestStream.Builder} like this one that will advance 
the watermark to the
-     *         specified point after all earlier events have completed.
+     *     specified point after all earlier events have completed.
      */
     public Builder<T> advanceWatermarkTo(Instant newWatermark) {
       checkArgument(
@@ -146,10 +148,11 @@ public final class TestStream<T> extends 
PTransform<PBegin, PCollection<T>> {
           "The Watermark cannot progress beyond the maximum. Got: %s. Maximum: 
%s",
           newWatermark,
           BoundedWindow.TIMESTAMP_MAX_VALUE);
-      ImmutableList<Event<T>> newEvents = ImmutableList.<Event<T>>builder()
-          .addAll(events)
-          .add(WatermarkEvent.<T>advanceTo(newWatermark))
-          .build();
+      ImmutableList<Event<T>> newEvents =
+          ImmutableList.<Event<T>>builder()
+              .addAll(events)
+              .add(WatermarkEvent.<T>advanceTo(newWatermark))
+              .build();
       return new Builder<T>(coder, newEvents, newWatermark);
     }
 
@@ -157,7 +160,7 @@ public final class TestStream<T> extends PTransform<PBegin, 
PCollection<T>> {
      * Advance the processing time by the specified amount.
      *
      * @return A {@link TestStream.Builder} like this one that will advance 
the processing time by
-     *         the specified amount after all earlier events have completed.
+     *     the specified amount after all earlier events have completed.
      */
     public Builder<T> advanceProcessingTime(Duration amount) {
       checkArgument(
@@ -194,9 +197,7 @@ public final class TestStream<T> extends PTransform<PBegin, 
PCollection<T>> {
     EventType getType();
   }
 
-  /**
-   * The types of {@link Event} that are supported by {@link TestStream}.
-   */
+  /** The types of {@link Event} that are supported by {@link TestStream}. */
   public enum EventType {
     ELEMENT,
     WATERMARK,
@@ -213,7 +214,11 @@ public final class TestStream<T> extends 
PTransform<PBegin, PCollection<T>> {
       return 
add(ImmutableList.<TimestampedValue<T>>builder().add(element).add(elements).build());
     }
 
-    static <T> Event<T> add(Iterable<TimestampedValue<T>> elements) {
+    /**
+     * <b>For internal use only: no backwards compatibility guarantees.</b>
+     */
+    @Internal
+    public static <T> Event<T> add(Iterable<TimestampedValue<T>> elements) {
       return new AutoValue_TestStream_ElementEvent<>(EventType.ELEMENT, 
elements);
     }
   }
@@ -223,7 +228,11 @@ public final class TestStream<T> extends 
PTransform<PBegin, PCollection<T>> {
   public abstract static class WatermarkEvent<T> implements Event<T> {
     public abstract Instant getWatermark();
 
-    static <T> Event<T> advanceTo(Instant newWatermark) {
+    /**
+     * <b>For internal use only: no backwards compatibility guarantees.</b>
+     */
+    @Internal
+    public static <T> Event<T> advanceTo(Instant newWatermark) {
       return new AutoValue_TestStream_WatermarkEvent<>(EventType.WATERMARK, 
newWatermark);
     }
   }
@@ -233,7 +242,11 @@ public final class TestStream<T> extends 
PTransform<PBegin, PCollection<T>> {
   public abstract static class ProcessingTimeEvent<T> implements Event<T> {
     public abstract Duration getProcessingTimeAdvance();
 
-    static <T> Event<T> advanceBy(Duration amount) {
+    /**
+     * <b>For internal use only: no backwards compatibility guarantees.</b>
+     */
+    @Internal
+    public static <T> Event<T> advanceBy(Duration amount) {
       return new 
AutoValue_TestStream_ProcessingTimeEvent<>(EventType.PROCESSING_TIME, amount);
     }
   }
@@ -257,4 +270,19 @@ public final class TestStream<T> extends 
PTransform<PBegin, PCollection<T>> {
   public List<Event<T>> getEvents() {
     return events;
   }
+
+  @Override
+  public boolean equals(Object other) {
+    if (!(other instanceof TestStream)) {
+      return false;
+    }
+    TestStream<?> that = (TestStream<?>) other;
+
+    return getValueCoder().equals(that.getValueCoder()) && 
getEvents().equals(that.getEvents());
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(TestStream.class, getValueCoder(), getEvents());
+  }
 }

Reply via email to