Fix ParDoTest#testPipelineOptionsParameter
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/725f547f Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/725f547f Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/725f547f Branch: refs/heads/master Commit: 725f547f5e487dd3e84d5d0f95c0fa3efa853279 Parents: 2206827 Author: manuzhang <owenzhang1...@gmail.com> Authored: Sat Jul 8 00:13:19 2017 +0800 Committer: manuzhang <owenzhang1...@gmail.com> Committed: Sat Jul 8 00:13:19 2017 +0800 ---------------------------------------------------------------------- .../gearpump/translators/io/GearpumpSource.java | 12 ++---------- .../translators/utils/DoFnRunnerFactory.java | 5 +++-- .../translators/utils/TranslatorUtils.java | 19 +++++++++++++++++++ 3 files changed, 24 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/725f547f/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java index daa8c81..2f53139 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java @@ -18,9 +18,6 @@ package org.apache.beam.runners.gearpump.translators.io; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; - import java.io.IOException; import java.time.Instant; @@ -48,11 +45,7 @@ public abstract class GearpumpSource<T> implements DataSource { private boolean available = false; GearpumpSource(PipelineOptions options) { - try { - this.serializedOptions = new ObjectMapper().writeValueAsBytes(options); - } catch (JsonProcessingException e) { - throw new RuntimeException(e); - } + this.serializedOptions = TranslatorUtils.serializePipelineOptions(options); } protected abstract Source.Reader<T> createReader(PipelineOptions options) throws IOException; @@ -60,8 +53,7 @@ public abstract class GearpumpSource<T> implements DataSource { @Override public void open(TaskContext context, Instant startTime) { try { - PipelineOptions options = new ObjectMapper() - .readValue(serializedOptions, PipelineOptions.class); + PipelineOptions options = TranslatorUtils.deserializePipelineOptions(serializedOptions); this.reader = createReader(options); this.available = reader.start(); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/beam/blob/725f547f/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java index 35cf2b5..375b696 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java @@ -43,7 +43,7 @@ public class DoFnRunnerFactory<InputT, OutputT> implements Serializable { private static final long serialVersionUID = -4109539010014189725L; private final DoFn<InputT, OutputT> fn; - private final transient PipelineOptions options; + private final byte[] serializedOptions; private final Collection<PCollectionView<?>> sideInputs; private final DoFnRunners.OutputManager outputManager; private final TupleTag<OutputT> mainOutputTag; @@ -61,7 +61,7 @@ public class DoFnRunnerFactory<InputT, OutputT> implements Serializable { StepContext stepContext, WindowingStrategy<?, ?> windowingStrategy) { this.fn = doFn; - this.options = pipelineOptions; + this.serializedOptions = TranslatorUtils.serializePipelineOptions(pipelineOptions); this.sideInputs = sideInputs; this.outputManager = outputManager; this.mainOutputTag = mainOutputTag; @@ -72,6 +72,7 @@ public class DoFnRunnerFactory<InputT, OutputT> implements Serializable { public PushbackSideInputDoFnRunner<InputT, OutputT> createRunner( ReadyCheckingSideInputReader sideInputReader) { + PipelineOptions options = TranslatorUtils.deserializePipelineOptions(serializedOptions); DoFnRunner<InputT, OutputT> underlying = DoFnRunners.simpleRunner( options, fn, sideInputReader, outputManager, mainOutputTag, sideOutputTags, stepContext, windowingStrategy); http://git-wip-us.apache.org/repos/asf/beam/blob/725f547f/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java index b1cd61c..c14298f 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java @@ -18,8 +18,11 @@ package org.apache.beam.runners.gearpump.translators.utils; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Lists; +import java.io.IOException; import java.time.Instant; import java.util.Collection; import java.util.HashMap; @@ -27,6 +30,7 @@ import java.util.List; import java.util.Map; import org.apache.beam.runners.gearpump.translators.TranslationContext; +import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; @@ -141,6 +145,21 @@ public class TranslatorUtils { } } + public static byte[] serializePipelineOptions(PipelineOptions options) { + try { + return new ObjectMapper().writeValueAsBytes(options); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } + + public static PipelineOptions deserializePipelineOptions(byte[] serializedOptions) { + try { + return new ObjectMapper().readValue(serializedOptions, PipelineOptions.class); + } catch (IOException e) { + throw new RuntimeException(e); + } + } /** * This is copied from org.apache.beam.sdk.transforms.join.RawUnionValue.