Port DirectRunner TestStream override to SDK-agnostic APIs

Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/eaaf45fa
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/eaaf45fa
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/eaaf45fa

Branch: refs/heads/gearpump-runner
Commit: eaaf45fa33d500a9f0fd0c2861aac4889ee5086c
Parents: ed6bd18
Author: Kenneth Knowles <k...@google.com>
Authored: Thu Jun 8 13:39:32 2017 -0700
Committer: Kenneth Knowles <k...@google.com>
Committed: Fri Jun 9 19:56:52 2017 -0700

----------------------------------------------------------------------
 .../construction/TestStreamTranslation.java     | 49 +++++++++++++++++++-
 .../direct/TestStreamEvaluatorFactory.java      | 20 ++++++--
 .../org/apache/beam/sdk/testing/TestStream.java | 12 +++++
 3 files changed, 75 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/eaaf45fa/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java
----------------------------------------------------------------------
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java
index 90e6304..515de57 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java
@@ -18,6 +18,9 @@
 
 package org.apache.beam.runners.core.construction;
 
+import static com.google.common.base.Preconditions.checkArgument;
+import static 
org.apache.beam.runners.core.construction.PTransformTranslation.TEST_STREAM_TRANSFORM_URN;
+
 import com.google.auto.service.AutoService;
 import com.google.protobuf.Any;
 import com.google.protobuf.ByteString;
@@ -33,6 +36,8 @@ import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.testing.TestStream;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TimestampedValue;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
@@ -57,6 +62,48 @@ public class TestStreamTranslation {
     return builder.build();
   }
 
+  private static TestStream<?> fromProto(
+      RunnerApi.TestStreamPayload testStreamPayload, RunnerApi.Components 
components)
+      throws IOException {
+
+    Coder<Object> coder =
+        (Coder<Object>)
+            CoderTranslation.fromProto(
+                components.getCodersOrThrow(testStreamPayload.getCoderId()), 
components);
+
+    List<TestStream.Event<Object>> events = new ArrayList<>();
+
+    for (RunnerApi.TestStreamPayload.Event event : 
testStreamPayload.getEventsList()) {
+      events.add(fromProto(event, coder));
+    }
+    return TestStream.fromRawEvents(coder, events);
+  }
+
+  /**
+   * Converts an {@link AppliedPTransform}, which may be a rehydrated 
transform or an original
+   * {@link TestStream}, to a {@link TestStream}.
+   */
+  public static <T> TestStream<T> getTestStream(
+      AppliedPTransform<PBegin, PCollection<T>, PTransform<PBegin, 
PCollection<T>>> application)
+      throws IOException {
+    // For robustness, we don't take this shortcut:
+    // if (application.getTransform() instanceof TestStream) {
+    //   return application.getTransform()
+    // }
+
+    SdkComponents sdkComponents = SdkComponents.create();
+    RunnerApi.PTransform transformProto = 
PTransformTranslation.toProto(application, sdkComponents);
+    checkArgument(
+        TEST_STREAM_TRANSFORM_URN.equals(transformProto.getSpec().getUrn()),
+        "Attempt to get %s from a transform with wrong URN %s",
+        TestStream.class.getSimpleName(),
+        transformProto.getSpec().getUrn());
+    RunnerApi.TestStreamPayload testStreamPayload =
+        
transformProto.getSpec().getParameter().unpack(RunnerApi.TestStreamPayload.class);
+
+    return (TestStream<T>) fromProto(testStreamPayload, 
sdkComponents.toComponents());
+  }
+
   static <T> RunnerApi.TestStreamPayload.Event toProto(TestStream.Event<T> 
event, Coder<T> coder)
       throws IOException {
     switch (event.getType()) {
@@ -130,7 +177,7 @@ public class TestStreamTranslation {
   static class TestStreamTranslator implements 
TransformPayloadTranslator<TestStream<?>> {
     @Override
     public String getUrn(TestStream<?> transform) {
-      return PTransformTranslation.TEST_STREAM_TRANSFORM_URN;
+      return TEST_STREAM_TRANSFORM_URN;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/eaaf45fa/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
index 2da7a71..16c8589 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
@@ -22,6 +22,7 @@ import com.google.auto.value.AutoValue;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Supplier;
 import com.google.common.collect.Iterables;
+import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
@@ -30,6 +31,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import javax.annotation.Nullable;
 import org.apache.beam.runners.core.construction.PTransformTranslation;
 import org.apache.beam.runners.core.construction.ReplacementOutputs;
+import org.apache.beam.runners.core.construction.TestStreamTranslation;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.runners.PTransformOverrideFactory;
 import org.apache.beam.sdk.testing.TestStream;
@@ -160,7 +162,8 @@ class TestStreamEvaluatorFactory implements 
TransformEvaluatorFactory {
   }
 
   static class DirectTestStreamFactory<T>
-      implements PTransformOverrideFactory<PBegin, PCollection<T>, 
TestStream<T>> {
+      implements PTransformOverrideFactory<
+          PBegin, PCollection<T>, PTransform<PBegin, PCollection<T>>> {
     private final DirectRunner runner;
 
     DirectTestStreamFactory(DirectRunner runner) {
@@ -169,10 +172,17 @@ class TestStreamEvaluatorFactory implements 
TransformEvaluatorFactory {
 
     @Override
     public PTransformReplacement<PBegin, PCollection<T>> 
getReplacementTransform(
-        AppliedPTransform<PBegin, PCollection<T>, TestStream<T>> transform) {
-      return PTransformReplacement.of(
-          transform.getPipeline().begin(),
-          new DirectTestStream<T>(runner, transform.getTransform()));
+        AppliedPTransform<PBegin, PCollection<T>, PTransform<PBegin, 
PCollection<T>>> transform) {
+      try {
+        return PTransformReplacement.of(
+            transform.getPipeline().begin(),
+            new DirectTestStream<T>(runner, 
TestStreamTranslation.getTestStream(transform)));
+      } catch (IOException exc) {
+        throw new RuntimeException(
+            String.format(
+                "Transform could not be converted to %s", 
TestStream.class.getSimpleName()),
+            exc);
+      }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/eaaf45fa/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java
index 9ad8fd8..d13fcf1 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java
@@ -271,6 +271,18 @@ public final class TestStream<T> extends 
PTransform<PBegin, PCollection<T>> {
     return events;
   }
 
+  /**
+   * <b>For internal use only. No backwards-compatibility guarantees.</b>
+   *
+   * <p>Builder a test stream directly from events. No validation is performed 
on
+   * watermark monotonicity, etc. This is assumed to be a previously-serialized
+   * {@link TestStream} transform that is correct by construction.
+   */
+  @Internal
+  public static <T> TestStream<T> fromRawEvents(Coder<T> coder, List<Event<T>> 
events) {
+    return new TestStream<>(coder, events);
+  }
+
   @Override
   public boolean equals(Object other) {
     if (!(other instanceof TestStream)) {

Reply via email to