Port DirectRunner TestStream override to SDK-agnostic APIs
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/eaaf45fa Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/eaaf45fa Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/eaaf45fa Branch: refs/heads/gearpump-runner Commit: eaaf45fa33d500a9f0fd0c2861aac4889ee5086c Parents: ed6bd18 Author: Kenneth Knowles <k...@google.com> Authored: Thu Jun 8 13:39:32 2017 -0700 Committer: Kenneth Knowles <k...@google.com> Committed: Fri Jun 9 19:56:52 2017 -0700 ---------------------------------------------------------------------- .../construction/TestStreamTranslation.java | 49 +++++++++++++++++++- .../direct/TestStreamEvaluatorFactory.java | 20 ++++++-- .../org/apache/beam/sdk/testing/TestStream.java | 12 +++++ 3 files changed, 75 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/eaaf45fa/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java index 90e6304..515de57 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java @@ -18,6 +18,9 @@ package org.apache.beam.runners.core.construction; +import static com.google.common.base.Preconditions.checkArgument; +import static org.apache.beam.runners.core.construction.PTransformTranslation.TEST_STREAM_TRANSFORM_URN; + import com.google.auto.service.AutoService; import com.google.protobuf.Any; import com.google.protobuf.ByteString; @@ -33,6 +36,8 @@ import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.testing.TestStream; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.util.CoderUtils; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TimestampedValue; import org.joda.time.Duration; import org.joda.time.Instant; @@ -57,6 +62,48 @@ public class TestStreamTranslation { return builder.build(); } + private static TestStream<?> fromProto( + RunnerApi.TestStreamPayload testStreamPayload, RunnerApi.Components components) + throws IOException { + + Coder<Object> coder = + (Coder<Object>) + CoderTranslation.fromProto( + components.getCodersOrThrow(testStreamPayload.getCoderId()), components); + + List<TestStream.Event<Object>> events = new ArrayList<>(); + + for (RunnerApi.TestStreamPayload.Event event : testStreamPayload.getEventsList()) { + events.add(fromProto(event, coder)); + } + return TestStream.fromRawEvents(coder, events); + } + + /** + * Converts an {@link AppliedPTransform}, which may be a rehydrated transform or an original + * {@link TestStream}, to a {@link TestStream}. + */ + public static <T> TestStream<T> getTestStream( + AppliedPTransform<PBegin, PCollection<T>, PTransform<PBegin, PCollection<T>>> application) + throws IOException { + // For robustness, we don't take this shortcut: + // if (application.getTransform() instanceof TestStream) { + // return application.getTransform() + // } + + SdkComponents sdkComponents = SdkComponents.create(); + RunnerApi.PTransform transformProto = PTransformTranslation.toProto(application, sdkComponents); + checkArgument( + TEST_STREAM_TRANSFORM_URN.equals(transformProto.getSpec().getUrn()), + "Attempt to get %s from a transform with wrong URN %s", + TestStream.class.getSimpleName(), + transformProto.getSpec().getUrn()); + RunnerApi.TestStreamPayload testStreamPayload = + transformProto.getSpec().getParameter().unpack(RunnerApi.TestStreamPayload.class); + + return (TestStream<T>) fromProto(testStreamPayload, sdkComponents.toComponents()); + } + static <T> RunnerApi.TestStreamPayload.Event toProto(TestStream.Event<T> event, Coder<T> coder) throws IOException { switch (event.getType()) { @@ -130,7 +177,7 @@ public class TestStreamTranslation { static class TestStreamTranslator implements TransformPayloadTranslator<TestStream<?>> { @Override public String getUrn(TestStream<?> transform) { - return PTransformTranslation.TEST_STREAM_TRANSFORM_URN; + return TEST_STREAM_TRANSFORM_URN; } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/eaaf45fa/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java index 2da7a71..16c8589 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java @@ -22,6 +22,7 @@ import com.google.auto.value.AutoValue; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Supplier; import com.google.common.collect.Iterables; +import java.io.IOException; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -30,6 +31,7 @@ import java.util.concurrent.atomic.AtomicReference; import javax.annotation.Nullable; import org.apache.beam.runners.core.construction.PTransformTranslation; import org.apache.beam.runners.core.construction.ReplacementOutputs; +import org.apache.beam.runners.core.construction.TestStreamTranslation; import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.runners.PTransformOverrideFactory; import org.apache.beam.sdk.testing.TestStream; @@ -160,7 +162,8 @@ class TestStreamEvaluatorFactory implements TransformEvaluatorFactory { } static class DirectTestStreamFactory<T> - implements PTransformOverrideFactory<PBegin, PCollection<T>, TestStream<T>> { + implements PTransformOverrideFactory< + PBegin, PCollection<T>, PTransform<PBegin, PCollection<T>>> { private final DirectRunner runner; DirectTestStreamFactory(DirectRunner runner) { @@ -169,10 +172,17 @@ class TestStreamEvaluatorFactory implements TransformEvaluatorFactory { @Override public PTransformReplacement<PBegin, PCollection<T>> getReplacementTransform( - AppliedPTransform<PBegin, PCollection<T>, TestStream<T>> transform) { - return PTransformReplacement.of( - transform.getPipeline().begin(), - new DirectTestStream<T>(runner, transform.getTransform())); + AppliedPTransform<PBegin, PCollection<T>, PTransform<PBegin, PCollection<T>>> transform) { + try { + return PTransformReplacement.of( + transform.getPipeline().begin(), + new DirectTestStream<T>(runner, TestStreamTranslation.getTestStream(transform))); + } catch (IOException exc) { + throw new RuntimeException( + String.format( + "Transform could not be converted to %s", TestStream.class.getSimpleName()), + exc); + } } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/eaaf45fa/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java index 9ad8fd8..d13fcf1 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java @@ -271,6 +271,18 @@ public final class TestStream<T> extends PTransform<PBegin, PCollection<T>> { return events; } + /** + * <b>For internal use only. No backwards-compatibility guarantees.</b> + * + * <p>Builder a test stream directly from events. No validation is performed on + * watermark monotonicity, etc. This is assumed to be a previously-serialized + * {@link TestStream} transform that is correct by construction. + */ + @Internal + public static <T> TestStream<T> fromRawEvents(Coder<T> coder, List<Event<T>> events) { + return new TestStream<>(coder, events); + } + @Override public boolean equals(Object other) { if (!(other instanceof TestStream)) {