Rehydrate PCollections
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/20ce0756 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/20ce0756 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/20ce0756 Branch: refs/heads/master Commit: 20ce0756c97f5ed47ad9c8cb46da574c273b5b46 Parents: c14a318 Author: Kenneth Knowles <k...@google.com> Authored: Thu Jul 6 09:24:22 2017 -0700 Committer: Kenneth Knowles <k...@google.com> Committed: Mon Jul 10 20:04:14 2017 -0700 ---------------------------------------------------------------------- .../construction/PCollectionTranslation.java | 16 ++++++++++++++ .../PCollectionTranslationTest.java | 22 ++++++++++++++++++++ 2 files changed, 38 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/20ce0756/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java index 968966f..52526bb 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java @@ -20,6 +20,7 @@ package org.apache.beam.runners.core.construction; import com.google.protobuf.InvalidProtocolBufferException; import java.io.IOException; +import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.common.runner.v1.RunnerApi; import org.apache.beam.sdk.values.PCollection; @@ -47,6 +48,21 @@ public class PCollectionTranslation { .build(); } + public static PCollection<?> fromProto( + Pipeline pipeline, RunnerApi.PCollection pCollection, RunnerApi.Components components) + throws IOException { + return PCollection.createPrimitiveOutputInternal( + pipeline, + WindowingStrategyTranslation.fromProto( + components.getWindowingStrategiesOrThrow(pCollection.getWindowingStrategyId()), + components), + fromProto(pCollection.getIsBounded())) + .setCoder( + (Coder) + CoderTranslation.fromProto( + components.getCodersOrThrow(pCollection.getCoderId()), components)); + } + public static IsBounded isBounded(RunnerApi.PCollection pCollection) { return fromProto(pCollection.getIsBounded()); } http://git-wip-us.apache.org/repos/asf/beam/blob/20ce0756/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PCollectionTranslationTest.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PCollectionTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PCollectionTranslationTest.java index 3b94220..5c45487 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PCollectionTranslationTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PCollectionTranslationTest.java @@ -113,6 +113,28 @@ public class PCollectionTranslationTest { @Test public void testEncodeDecodeCycle() throws Exception { + // Encode + SdkComponents sdkComponents = SdkComponents.create(); + RunnerApi.PCollection protoCollection = + PCollectionTranslation.toProto(testCollection, sdkComponents); + RunnerApi.Components protoComponents = sdkComponents.toComponents(); + + // Decode + Pipeline pipeline = Pipeline.create(); + PCollection<?> decodedCollection = + PCollectionTranslation.fromProto(pipeline, protoCollection, protoComponents); + + // Verify + assertThat(decodedCollection.getCoder(), Matchers.<Coder<?>>equalTo(testCollection.getCoder())); + assertThat( + decodedCollection.getWindowingStrategy(), + Matchers.<WindowingStrategy<?, ?>>equalTo( + testCollection.getWindowingStrategy().fixDefaults())); + assertThat(decodedCollection.isBounded(), equalTo(testCollection.isBounded())); + } + + @Test + public void testEncodeDecodeFields() throws Exception { SdkComponents sdkComponents = SdkComponents.create(); RunnerApi.PCollection protoCollection = PCollectionTranslation .toProto(testCollection, sdkComponents);