Rehydrate PCollections

Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/20ce0756
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/20ce0756
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/20ce0756

Branch: refs/heads/master
Commit: 20ce0756c97f5ed47ad9c8cb46da574c273b5b46
Parents: c14a318
Author: Kenneth Knowles <k...@google.com>
Authored: Thu Jul 6 09:24:22 2017 -0700
Committer: Kenneth Knowles <k...@google.com>
Committed: Mon Jul 10 20:04:14 2017 -0700

----------------------------------------------------------------------
 .../construction/PCollectionTranslation.java    | 16 ++++++++++++++
 .../PCollectionTranslationTest.java             | 22 ++++++++++++++++++++
 2 files changed, 38 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/20ce0756/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java
----------------------------------------------------------------------
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java
index 968966f..52526bb 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java
@@ -20,6 +20,7 @@ package org.apache.beam.runners.core.construction;
 
 import com.google.protobuf.InvalidProtocolBufferException;
 import java.io.IOException;
+import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi;
 import org.apache.beam.sdk.values.PCollection;
@@ -47,6 +48,21 @@ public class PCollectionTranslation {
         .build();
   }
 
+  public static PCollection<?> fromProto(
+      Pipeline pipeline, RunnerApi.PCollection pCollection, 
RunnerApi.Components components)
+      throws IOException {
+    return PCollection.createPrimitiveOutputInternal(
+            pipeline,
+            WindowingStrategyTranslation.fromProto(
+                
components.getWindowingStrategiesOrThrow(pCollection.getWindowingStrategyId()),
+                components),
+            fromProto(pCollection.getIsBounded()))
+        .setCoder(
+            (Coder)
+                CoderTranslation.fromProto(
+                    components.getCodersOrThrow(pCollection.getCoderId()), 
components));
+  }
+
   public static IsBounded isBounded(RunnerApi.PCollection pCollection) {
     return fromProto(pCollection.getIsBounded());
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/20ce0756/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PCollectionTranslationTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PCollectionTranslationTest.java
 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PCollectionTranslationTest.java
index 3b94220..5c45487 100644
--- 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PCollectionTranslationTest.java
+++ 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PCollectionTranslationTest.java
@@ -113,6 +113,28 @@ public class PCollectionTranslationTest {
 
   @Test
   public void testEncodeDecodeCycle() throws Exception {
+    // Encode
+    SdkComponents sdkComponents = SdkComponents.create();
+    RunnerApi.PCollection protoCollection =
+        PCollectionTranslation.toProto(testCollection, sdkComponents);
+    RunnerApi.Components protoComponents = sdkComponents.toComponents();
+
+    // Decode
+    Pipeline pipeline = Pipeline.create();
+    PCollection<?> decodedCollection =
+        PCollectionTranslation.fromProto(pipeline, protoCollection, 
protoComponents);
+
+    // Verify
+    assertThat(decodedCollection.getCoder(), 
Matchers.<Coder<?>>equalTo(testCollection.getCoder()));
+    assertThat(
+        decodedCollection.getWindowingStrategy(),
+        Matchers.<WindowingStrategy<?, ?>>equalTo(
+            testCollection.getWindowingStrategy().fixDefaults()));
+    assertThat(decodedCollection.isBounded(), 
equalTo(testCollection.isBounded()));
+  }
+
+  @Test
+  public void testEncodeDecodeFields() throws Exception {
     SdkComponents sdkComponents = SdkComponents.create();
     RunnerApi.PCollection protoCollection = PCollectionTranslation
         .toProto(testCollection, sdkComponents);

Reply via email to