Repository: beam Updated Branches: refs/heads/mr-runner b6f22aa76 -> 2cef54ea2
mr-runner: use SerializablePipelineOptions to serde PipelineOptions Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/71b5e7c4 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/71b5e7c4 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/71b5e7c4 Branch: refs/heads/mr-runner Commit: 71b5e7c45d1501030717cbfd608bfae36641de79 Parents: b6f22aa Author: huafengw <fvunic...@gmail.com> Authored: Wed Sep 13 10:24:24 2017 +0800 Committer: Pei He <p...@apache.org> Committed: Wed Nov 8 14:28:41 2017 +0800 ---------------------------------------------------------------------- runners/map-reduce/pom.xml | 2 +- .../mapreduce/translation/BeamInputFormat.java | 22 +++--- .../mapreduce/translation/JobPrototype.java | 4 +- .../mapreduce/translation/ParDoOperation.java | 7 +- .../translation/SerializedPipelineOptions.java | 76 -------------------- 5 files changed, 20 insertions(+), 91 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/71b5e7c4/runners/map-reduce/pom.xml ---------------------------------------------------------------------- diff --git a/runners/map-reduce/pom.xml b/runners/map-reduce/pom.xml index 000f20c..7f2e851 100644 --- a/runners/map-reduce/pom.xml +++ b/runners/map-reduce/pom.xml @@ -93,7 +93,7 @@ </profiles> <dependencies> - <!-- MapRecue dependencies --> + <!-- MapReduce dependencies --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> http://git-wip-us.apache.org/repos/asf/beam/blob/71b5e7c4/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java index 3d0b8ea..8a55c5e 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java @@ -31,6 +31,8 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.List; + +import org.apache.beam.runners.core.construction.SerializablePipelineOptions; import org.apache.beam.sdk.coders.SerializableCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.BoundedSource; @@ -57,7 +59,7 @@ public class BeamInputFormat<T> extends InputFormat { private static final long DEFAULT_DESIRED_BUNDLE_SIZE_SIZE_BYTES = 5 * 1000 * 1000; private List<ReadOperation.TaggedSource> sources; - private SerializedPipelineOptions options; + private SerializablePipelineOptions options; public BeamInputFormat() { } @@ -73,8 +75,8 @@ public class BeamInputFormat<T> extends InputFormat { } sources = (List<ReadOperation.TaggedSource>) SerializableUtils.deserializeFromByteArray( Base64.decodeBase64(serializedBoundedSource), "TaggedSources"); - options = ((SerializedPipelineOptions) SerializableUtils.deserializeFromByteArray( - Base64.decodeBase64(serializedPipelineOptions), "SerializedPipelineOptions")); + options = ((SerializablePipelineOptions) SerializableUtils.deserializeFromByteArray( + Base64.decodeBase64(serializedPipelineOptions), "SerializablePipelineOptions")); try { @@ -86,7 +88,7 @@ public class BeamInputFormat<T> extends InputFormat { final ReadOperation.TaggedSource taggedSource) { try { return FluentIterable.from(taggedSource.getSource().split( - DEFAULT_DESIRED_BUNDLE_SIZE_SIZE_BYTES, options.getPipelineOptions())) + DEFAULT_DESIRED_BUNDLE_SIZE_SIZE_BYTES, options.get())) .transform(new Function<BoundedSource<?>, ReadOperation.TaggedSource>() { @Override public ReadOperation.TaggedSource apply(BoundedSource<?> input) { @@ -120,7 +122,7 @@ public class BeamInputFormat<T> extends InputFormat { private static class BeamInputSplit<T> extends InputSplit implements Writable { private String stepName; private BoundedSource<T> boundedSource; - private SerializedPipelineOptions options; + private SerializablePipelineOptions options; private TupleTag<?> tupleTag; public BeamInputSplit() { @@ -129,7 +131,7 @@ public class BeamInputFormat<T> extends InputFormat { public BeamInputSplit( String stepName, BoundedSource<T> boundedSource, - SerializedPipelineOptions options, + SerializablePipelineOptions options, TupleTag<?> tupleTag) { this.stepName = checkNotNull(stepName, "stepName"); this.boundedSource = checkNotNull(boundedSource, "boundedSources"); @@ -139,13 +141,13 @@ public class BeamInputFormat<T> extends InputFormat { public BeamRecordReader<T> createReader() throws IOException { return new BeamRecordReader<>( - stepName, boundedSource.createReader(options.getPipelineOptions()), tupleTag); + stepName, boundedSource.createReader(options.get()), tupleTag); } @Override public long getLength() throws IOException, InterruptedException { try { - return boundedSource.getEstimatedSizeBytes(options.getPipelineOptions()); + return boundedSource.getEstimatedSizeBytes(options.get()); } catch (Exception e) { Throwables.throwIfUnchecked(e); Throwables.throwIfInstanceOf(e, IOException.class); @@ -164,7 +166,7 @@ public class BeamInputFormat<T> extends InputFormat { ByteArrayOutputStream stream = new ByteArrayOutputStream(); StringUtf8Coder.of().encode(stepName, stream); SerializableCoder.of(BoundedSource.class).encode(boundedSource, stream); - SerializableCoder.of(SerializedPipelineOptions.class).encode(options, stream); + SerializableCoder.of(SerializablePipelineOptions.class).encode(options, stream); SerializableCoder.of(TupleTag.class).encode(tupleTag, stream); byte[] bytes = stream.toByteArray(); @@ -181,7 +183,7 @@ public class BeamInputFormat<T> extends InputFormat { ByteArrayInputStream inStream = new ByteArrayInputStream(bytes); stepName = StringUtf8Coder.of().decode(inStream); boundedSource = SerializableCoder.of(BoundedSource.class).decode(inStream); - options = SerializableCoder.of(SerializedPipelineOptions.class).decode(inStream); + options = SerializableCoder.of(SerializablePipelineOptions.class).decode(inStream); tupleTag = SerializableCoder.of(TupleTag.class).decode(inStream); } } http://git-wip-us.apache.org/repos/asf/beam/blob/71b5e7c4/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java index e8e6eab..3e0061a 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java @@ -30,6 +30,8 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Set; + +import org.apache.beam.runners.core.construction.SerializablePipelineOptions; import org.apache.beam.runners.mapreduce.MapReducePipelineOptions; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; @@ -106,7 +108,7 @@ public class JobPrototype { conf.set( BeamInputFormat.BEAM_SERIALIZED_PIPELINE_OPTIONS, Base64.encodeBase64String(SerializableUtils.serializeToByteArray( - new SerializedPipelineOptions(options)))); + new SerializablePipelineOptions(options)))); job.setInputFormatClass(BeamInputFormat.class); if (fusedStep.containsGroupByKey()) { http://git-wip-us.apache.org/repos/asf/beam/blob/71b5e7c4/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoOperation.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoOperation.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoOperation.java index ef83e72..fd4daca 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoOperation.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoOperation.java @@ -34,6 +34,7 @@ import org.apache.beam.runners.core.NullSideInputReader; import org.apache.beam.runners.core.StateInternals; import org.apache.beam.runners.core.StepContext; import org.apache.beam.runners.core.TimerInternals; +import org.apache.beam.runners.core.construction.SerializablePipelineOptions; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.metrics.MetricsEnvironment; import org.apache.beam.sdk.options.PipelineOptions; @@ -50,7 +51,7 @@ import org.apache.hadoop.mapreduce.TaskInputOutputContext; */ public abstract class ParDoOperation<InputT, OutputT> extends Operation<InputT> { private final String stepName; - protected final SerializedPipelineOptions options; + protected final SerializablePipelineOptions options; protected final TupleTag<OutputT> mainOutputTag; private final List<TupleTag<?>> sideOutputTags; protected final WindowingStrategy<?, ?> windowingStrategy; @@ -70,7 +71,7 @@ public abstract class ParDoOperation<InputT, OutputT> extends Operation<InputT> WindowingStrategy<?, ?> windowingStrategy) { super(1 + sideOutputTags.size()); this.stepName = checkNotNull(stepName, "stepName"); - this.options = new SerializedPipelineOptions(checkNotNull(options, "options")); + this.options = new SerializablePipelineOptions(checkNotNull(options, "options")); this.mainOutputTag = checkNotNull(mainOutputTag, "mainOutputTag"); this.sideOutputTags = checkNotNull(sideOutputTags, "sideOutputTags"); this.windowingStrategy = checkNotNull(windowingStrategy, "windowingStrategy"); @@ -109,7 +110,7 @@ public abstract class ParDoOperation<InputT, OutputT> extends Operation<InputT> final TimerInternals timerInternals = new InMemoryTimerInternals(); fnRunner = DoFnRunners.simpleRunner( - options.getPipelineOptions(), + options.get(), getDoFn(), sideInputTags.isEmpty() ? NullSideInputReader.empty() : http://git-wip-us.apache.org/repos/asf/beam/blob/71b5e7c4/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/SerializedPipelineOptions.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/SerializedPipelineOptions.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/SerializedPipelineOptions.java deleted file mode 100644 index 5c37b7c..0000000 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/SerializedPipelineOptions.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.mapreduce.translation; - -import static com.google.common.base.Preconditions.checkNotNull; - -import com.fasterxml.jackson.databind.Module; -import com.fasterxml.jackson.databind.ObjectMapper; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.Serializable; -import org.apache.beam.sdk.io.FileSystems; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.util.common.ReflectHelpers; - -/** - * Encapsulates the PipelineOptions in serialized form to ship them to the cluster. - */ -public class SerializedPipelineOptions implements Serializable { - - private final byte[] serializedOptions; - - /** Lazily initialized copy of deserialized options. */ - private transient PipelineOptions pipelineOptions; - - public SerializedPipelineOptions(PipelineOptions options) { - checkNotNull(options, "PipelineOptions must not be null."); - - try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) { - createMapper().writeValue(baos, options); - this.serializedOptions = baos.toByteArray(); - } catch (Exception e) { - throw new RuntimeException("Couldn't serialize PipelineOptions.", e); - } - - } - - public PipelineOptions getPipelineOptions() { - if (pipelineOptions == null) { - try { - pipelineOptions = createMapper().readValue(serializedOptions, PipelineOptions.class); - - FileSystems.setDefaultPipelineOptions(pipelineOptions); - } catch (IOException e) { - throw new RuntimeException("Couldn't deserialize the PipelineOptions.", e); - } - } - - return pipelineOptions; - } - - /** - * Use an {@link ObjectMapper} configured with any {@link Module}s in the class path allowing - * for user specified configuration injection into the ObjectMapper. This supports user custom - * types on {@link PipelineOptions}. - */ - private static ObjectMapper createMapper() { - return new ObjectMapper().registerModules( - ObjectMapper.findModules(ReflectHelpers.findClassLoader())); - } -}