Fix ParDoTest#testPipelineOptionsParameter

Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/725f547f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/725f547f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/725f547f

Branch: refs/heads/master
Commit: 725f547f5e487dd3e84d5d0f95c0fa3efa853279
Parents: 2206827
Author: manuzhang <owenzhang1...@gmail.com>
Authored: Sat Jul 8 00:13:19 2017 +0800
Committer: manuzhang <owenzhang1...@gmail.com>
Committed: Sat Jul 8 00:13:19 2017 +0800

----------------------------------------------------------------------
 .../gearpump/translators/io/GearpumpSource.java  | 12 ++----------
 .../translators/utils/DoFnRunnerFactory.java     |  5 +++--
 .../translators/utils/TranslatorUtils.java       | 19 +++++++++++++++++++
 3 files changed, 24 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/725f547f/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java
----------------------------------------------------------------------
diff --git 
a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java
 
b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java
index daa8c81..2f53139 100644
--- 
a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java
+++ 
b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java
@@ -18,9 +18,6 @@
 
 package org.apache.beam.runners.gearpump.translators.io;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-
 import java.io.IOException;
 import java.time.Instant;
 
@@ -48,11 +45,7 @@ public abstract class GearpumpSource<T> implements 
DataSource {
   private boolean available = false;
 
   GearpumpSource(PipelineOptions options) {
-    try {
-      this.serializedOptions = new ObjectMapper().writeValueAsBytes(options);
-    } catch (JsonProcessingException e) {
-      throw new RuntimeException(e);
-    }
+    this.serializedOptions = TranslatorUtils.serializePipelineOptions(options);
   }
 
   protected abstract Source.Reader<T> createReader(PipelineOptions options) 
throws IOException;
@@ -60,8 +53,7 @@ public abstract class GearpumpSource<T> implements DataSource 
{
   @Override
   public void open(TaskContext context, Instant startTime) {
     try {
-      PipelineOptions options = new ObjectMapper()
-          .readValue(serializedOptions, PipelineOptions.class);
+      PipelineOptions options = 
TranslatorUtils.deserializePipelineOptions(serializedOptions);
       this.reader = createReader(options);
       this.available = reader.start();
     } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/beam/blob/725f547f/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java
 
b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java
index 35cf2b5..375b696 100644
--- 
a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java
+++ 
b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java
@@ -43,7 +43,7 @@ public class DoFnRunnerFactory<InputT, OutputT> implements 
Serializable {
 
   private static final long serialVersionUID = -4109539010014189725L;
   private final DoFn<InputT, OutputT> fn;
-  private final transient PipelineOptions options;
+  private final byte[] serializedOptions;
   private final Collection<PCollectionView<?>> sideInputs;
   private final DoFnRunners.OutputManager outputManager;
   private final TupleTag<OutputT> mainOutputTag;
@@ -61,7 +61,7 @@ public class DoFnRunnerFactory<InputT, OutputT> implements 
Serializable {
       StepContext stepContext,
       WindowingStrategy<?, ?> windowingStrategy) {
     this.fn = doFn;
-    this.options = pipelineOptions;
+    this.serializedOptions = 
TranslatorUtils.serializePipelineOptions(pipelineOptions);
     this.sideInputs = sideInputs;
     this.outputManager = outputManager;
     this.mainOutputTag = mainOutputTag;
@@ -72,6 +72,7 @@ public class DoFnRunnerFactory<InputT, OutputT> implements 
Serializable {
 
   public PushbackSideInputDoFnRunner<InputT, OutputT> createRunner(
       ReadyCheckingSideInputReader sideInputReader) {
+    PipelineOptions options = 
TranslatorUtils.deserializePipelineOptions(serializedOptions);
     DoFnRunner<InputT, OutputT> underlying = DoFnRunners.simpleRunner(
         options, fn, sideInputReader, outputManager, mainOutputTag,
         sideOutputTags, stepContext, windowingStrategy);

http://git-wip-us.apache.org/repos/asf/beam/blob/725f547f/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java
----------------------------------------------------------------------
diff --git 
a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java
 
b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java
index b1cd61c..c14298f 100644
--- 
a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java
+++ 
b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java
@@ -18,8 +18,11 @@
 
 package org.apache.beam.runners.gearpump.translators.utils;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.Lists;
 
+import java.io.IOException;
 import java.time.Instant;
 import java.util.Collection;
 import java.util.HashMap;
@@ -27,6 +30,7 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.beam.runners.gearpump.translators.TranslationContext;
+import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
@@ -141,6 +145,21 @@ public class TranslatorUtils {
     }
   }
 
+  public static byte[] serializePipelineOptions(PipelineOptions options) {
+    try {
+      return new ObjectMapper().writeValueAsBytes(options);
+    } catch (JsonProcessingException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public static PipelineOptions deserializePipelineOptions(byte[] 
serializedOptions) {
+    try {
+      return new ObjectMapper().readValue(serializedOptions, 
PipelineOptions.class);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
 
   /**
    * This is copied from org.apache.beam.sdk.transforms.join.RawUnionValue.

Reply via email to