Introduces SerializablePipelineOptions in core-construction Removes analogous classes from spark/flink and their tests.
The analogous class in Spark was SparkRuntimeContext, which also contained a CoderRegistry, but the CoderRegistry was used only in a class that was itself unused. I removed that class. This also allows removing a bunch of Jackson dependencies from Spark runner. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7db051ae Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7db051ae Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7db051ae Branch: refs/heads/master Commit: 7db051aeae2b8e6b2dbfcc1da31410ec118299f6 Parents: ff4b36c Author: Eugene Kirpichov <kirpic...@google.com> Authored: Fri Jul 28 12:48:41 2017 -0700 Committer: Eugene Kirpichov <kirpic...@google.com> Committed: Wed Aug 2 11:04:50 2017 -0700 ---------------------------------------------------------------------- runners/apex/pom.xml | 8 - .../operators/ApexGroupByKeyOperator.java | 6 +- .../operators/ApexParDoOperator.java | 6 +- .../ApexReadUnboundedInputOperator.java | 6 +- .../utils/SerializablePipelineOptions.java | 78 --------- .../translation/utils/PipelineOptionsTest.java | 150 ----------------- runners/core-construction-java/pom.xml | 15 ++ .../SerializablePipelineOptions.java | 74 +++++++++ .../SerializablePipelineOptionsTest.java | 89 ++++++++++ runners/flink/pom.xml | 10 -- .../functions/FlinkDoFnFunction.java | 10 +- .../FlinkMergingNonShuffleReduceFunction.java | 8 +- .../functions/FlinkPartialReduceFunction.java | 8 +- .../functions/FlinkReduceFunction.java | 8 +- .../functions/FlinkStatefulDoFnFunction.java | 10 +- .../utils/SerializedPipelineOptions.java | 77 --------- .../translation/wrappers/SourceInputFormat.java | 10 +- .../wrappers/streaming/DoFnOperator.java | 10 +- .../streaming/SplittableDoFnOperator.java | 2 +- .../streaming/io/BoundedSourceWrapper.java | 10 +- .../streaming/io/UnboundedSourceWrapper.java | 12 +- .../beam/runners/flink/PipelineOptionsTest.java | 165 +------------------ runners/spark/pom.xml | 12 -- .../spark/aggregators/NamedAggregators.java | 93 ----------- .../beam/runners/spark/io/SourceDStream.java | 20 +-- .../apache/beam/runners/spark/io/SourceRDD.java | 22 +-- .../runners/spark/io/SparkUnboundedSource.java | 6 +- .../SparkGroupAlsoByWindowViaWindowSet.java | 10 +- .../spark/stateful/StateSpecFunctions.java | 8 +- .../spark/translation/EvaluationContext.java | 11 +- .../spark/translation/MultiDoFnFunction.java | 16 +- .../translation/SparkAbstractCombineFn.java | 9 +- .../spark/translation/SparkGlobalCombineFn.java | 5 +- ...SparkGroupAlsoByWindowViaOutputBufferFn.java | 9 +- .../spark/translation/SparkKeyedCombineFn.java | 5 +- .../spark/translation/SparkRuntimeContext.java | 90 ---------- .../spark/translation/TransformTranslator.java | 27 ++- .../streaming/StreamingTransformTranslator.java | 20 +-- .../translation/SparkRuntimeContextTest.java | 122 -------------- .../beam/sdk/options/PipelineOptions.java | 7 +- .../apache/beam/sdk/options/ValueProviders.java | 8 +- 41 files changed, 327 insertions(+), 945 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/apex/pom.xml ---------------------------------------------------------------------- diff --git a/runners/apex/pom.xml b/runners/apex/pom.xml index fd5aafb..96aac8b 100644 --- a/runners/apex/pom.xml +++ b/runners/apex/pom.xml @@ -63,14 +63,6 @@ <version>${apex.malhar.version}</version> </dependency> <dependency> - <groupId>com.fasterxml.jackson.core</groupId> - <artifactId>jackson-core</artifactId> - </dependency> - <dependency> - <groupId>com.fasterxml.jackson.core</groupId> - <artifactId>jackson-databind</artifactId> - </dependency> - <dependency> <groupId>org.apache.apex</groupId> <artifactId>apex-engine</artifactId> <version>${apex.core.version}</version> http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java index 39f681f..5c0d72f 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java @@ -33,7 +33,6 @@ import java.util.Collections; import org.apache.beam.runners.apex.ApexPipelineOptions; import org.apache.beam.runners.apex.translation.utils.ApexStateInternals.ApexStateBackend; import org.apache.beam.runners.apex.translation.utils.ApexStreamTuple; -import org.apache.beam.runners.apex.translation.utils.SerializablePipelineOptions; import org.apache.beam.runners.core.NullSideInputReader; import org.apache.beam.runners.core.OutputWindowedValue; import org.apache.beam.runners.core.ReduceFnRunner; @@ -41,6 +40,7 @@ import org.apache.beam.runners.core.StateInternalsFactory; import org.apache.beam.runners.core.SystemReduceFn; import org.apache.beam.runners.core.TimerInternals; import org.apache.beam.runners.core.TimerInternals.TimerData; +import org.apache.beam.runners.core.construction.SerializablePipelineOptions; import org.apache.beam.runners.core.construction.TriggerTranslation; import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine; import org.apache.beam.runners.core.triggers.TriggerStateMachines; @@ -149,7 +149,9 @@ public class ApexGroupByKeyOperator<K, V> implements Operator, @Override public void setup(OperatorContext context) { - this.traceTuples = ApexStreamTuple.Logging.isDebugEnabled(serializedOptions.get(), this); + this.traceTuples = + ApexStreamTuple.Logging.isDebugEnabled( + serializedOptions.get().as(ApexPipelineOptions.class), this); } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java index c3cbab2..4dc807d 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java @@ -40,7 +40,6 @@ import org.apache.beam.runners.apex.ApexRunner; import org.apache.beam.runners.apex.translation.utils.ApexStateInternals.ApexStateBackend; import org.apache.beam.runners.apex.translation.utils.ApexStreamTuple; import org.apache.beam.runners.apex.translation.utils.NoOpStepContext; -import org.apache.beam.runners.apex.translation.utils.SerializablePipelineOptions; import org.apache.beam.runners.apex.translation.utils.StateInternalsProxy; import org.apache.beam.runners.apex.translation.utils.ValueAndCoderKryoSerializable; import org.apache.beam.runners.core.DoFnRunner; @@ -64,6 +63,7 @@ import org.apache.beam.runners.core.StatefulDoFnRunner; import org.apache.beam.runners.core.TimerInternals; import org.apache.beam.runners.core.TimerInternals.TimerData; import org.apache.beam.runners.core.TimerInternalsFactory; +import org.apache.beam.runners.core.construction.SerializablePipelineOptions; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.ListCoder; @@ -386,7 +386,9 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements @Override public void setup(OperatorContext context) { - this.traceTuples = ApexStreamTuple.Logging.isDebugEnabled(pipelineOptions.get(), this); + this.traceTuples = + ApexStreamTuple.Logging.isDebugEnabled( + pipelineOptions.get().as(ApexPipelineOptions.class), this); SideInputReader sideInputReader = NullSideInputReader.of(sideInputs); if (!sideInputs.isEmpty()) { sideInputHandler = new SideInputHandler(sideInputs, sideInputStateInternals); http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexReadUnboundedInputOperator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexReadUnboundedInputOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexReadUnboundedInputOperator.java index 1549560..21fb9d2 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexReadUnboundedInputOperator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexReadUnboundedInputOperator.java @@ -30,8 +30,8 @@ import java.io.IOException; import org.apache.beam.runners.apex.ApexPipelineOptions; import org.apache.beam.runners.apex.translation.utils.ApexStreamTuple; import org.apache.beam.runners.apex.translation.utils.ApexStreamTuple.DataTuple; -import org.apache.beam.runners.apex.translation.utils.SerializablePipelineOptions; import org.apache.beam.runners.apex.translation.utils.ValuesSource; +import org.apache.beam.runners.core.construction.SerializablePipelineOptions; import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; @@ -119,7 +119,9 @@ public class ApexReadUnboundedInputOperator<OutputT, CheckpointMarkT @Override public void setup(OperatorContext context) { - this.traceTuples = ApexStreamTuple.Logging.isDebugEnabled(pipelineOptions.get(), this); + this.traceTuples = + ApexStreamTuple.Logging.isDebugEnabled( + pipelineOptions.get().as(ApexPipelineOptions.class), this); try { reader = source.createReader(this.pipelineOptions.get(), null); available = reader.start(); http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/SerializablePipelineOptions.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/SerializablePipelineOptions.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/SerializablePipelineOptions.java deleted file mode 100644 index 46b04fc..0000000 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/SerializablePipelineOptions.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.apex.translation.utils; - -import com.fasterxml.jackson.databind.Module; -import com.fasterxml.jackson.databind.ObjectMapper; -import java.io.Externalizable; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; -import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.beam.runners.apex.ApexPipelineOptions; -import org.apache.beam.sdk.io.FileSystems; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.util.common.ReflectHelpers; - -/** - * A wrapper to enable serialization of {@link PipelineOptions}. - */ -public class SerializablePipelineOptions implements Externalizable { - - /* Used to ensure we initialize file systems exactly once, because it's a slow operation. */ - private static final AtomicBoolean FILE_SYSTEMS_INTIIALIZED = new AtomicBoolean(false); - - private transient ApexPipelineOptions pipelineOptions; - - public SerializablePipelineOptions(ApexPipelineOptions pipelineOptions) { - this.pipelineOptions = pipelineOptions; - } - - public SerializablePipelineOptions() { - } - - public ApexPipelineOptions get() { - return this.pipelineOptions; - } - - @Override - public void writeExternal(ObjectOutput out) throws IOException { - out.writeUTF(createMapper().writeValueAsString(pipelineOptions)); - } - - @Override - public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - String s = in.readUTF(); - this.pipelineOptions = createMapper().readValue(s, PipelineOptions.class) - .as(ApexPipelineOptions.class); - - if (FILE_SYSTEMS_INTIIALIZED.compareAndSet(false, true)) { - FileSystems.setDefaultPipelineOptions(pipelineOptions); - } - } - - /** - * Use an {@link ObjectMapper} configured with any {@link Module}s in the class path allowing - * for user specified configuration injection into the ObjectMapper. This supports user custom - * types on {@link PipelineOptions}. - */ - private static ObjectMapper createMapper() { - return new ObjectMapper().registerModules( - ObjectMapper.findModules(ReflectHelpers.findClassLoader())); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/PipelineOptionsTest.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/PipelineOptionsTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/PipelineOptionsTest.java deleted file mode 100644 index 118ff99..0000000 --- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/PipelineOptionsTest.java +++ /dev/null @@ -1,150 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.apex.translation.utils; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; - -import com.datatorrent.common.util.FSStorageAgent; -import com.esotericsoftware.kryo.serializers.FieldSerializer.Bind; -import com.esotericsoftware.kryo.serializers.JavaSerializer; -import com.fasterxml.jackson.core.JsonGenerator; -import com.fasterxml.jackson.core.JsonParser; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.DeserializationContext; -import com.fasterxml.jackson.databind.JsonDeserializer; -import com.fasterxml.jackson.databind.JsonSerializer; -import com.fasterxml.jackson.databind.Module; -import com.fasterxml.jackson.databind.SerializerProvider; -import com.fasterxml.jackson.databind.annotation.JsonDeserialize; -import com.fasterxml.jackson.databind.annotation.JsonSerialize; -import com.fasterxml.jackson.databind.module.SimpleModule; -import com.google.auto.service.AutoService; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import org.apache.beam.runners.apex.ApexPipelineOptions; -import org.apache.beam.sdk.options.Default; -import org.apache.beam.sdk.options.Description; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.junit.Test; - -/** - * Tests the serialization of PipelineOptions. - */ -public class PipelineOptionsTest { - - /** - * Interface for testing. - */ - public interface MyOptions extends ApexPipelineOptions { - @Description("Bla bla bla") - @Default.String("Hello") - String getTestOption(); - void setTestOption(String value); - } - - private static class OptionsWrapper { - private OptionsWrapper() { - this(null); // required for Kryo - } - private OptionsWrapper(ApexPipelineOptions options) { - this.options = new SerializablePipelineOptions(options); - } - @Bind(JavaSerializer.class) - private final SerializablePipelineOptions options; - } - - @Test - public void testSerialization() { - OptionsWrapper wrapper = new OptionsWrapper( - PipelineOptionsFactory.fromArgs("--testOption=nothing").as(MyOptions.class)); - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - FSStorageAgent.store(bos, wrapper); - - ByteArrayInputStream bis = new ByteArrayInputStream(bos.toByteArray()); - OptionsWrapper wrapperCopy = (OptionsWrapper) FSStorageAgent.retrieve(bis); - assertNotNull(wrapperCopy.options); - assertEquals("nothing", wrapperCopy.options.get().as(MyOptions.class).getTestOption()); - } - - @Test - public void testSerializationWithUserCustomType() { - OptionsWrapper wrapper = new OptionsWrapper( - PipelineOptionsFactory.fromArgs("--jacksonIncompatible=\"testValue\"") - .as(JacksonIncompatibleOptions.class)); - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - FSStorageAgent.store(bos, wrapper); - - ByteArrayInputStream bis = new ByteArrayInputStream(bos.toByteArray()); - OptionsWrapper wrapperCopy = (OptionsWrapper) FSStorageAgent.retrieve(bis); - assertNotNull(wrapperCopy.options); - assertEquals("testValue", - wrapperCopy.options.get().as(JacksonIncompatibleOptions.class) - .getJacksonIncompatible().value); - } - - /** PipelineOptions used to test auto registration of Jackson modules. */ - public interface JacksonIncompatibleOptions extends ApexPipelineOptions { - JacksonIncompatible getJacksonIncompatible(); - void setJacksonIncompatible(JacksonIncompatible value); - } - - /** A Jackson {@link Module} to test auto-registration of modules. */ - @AutoService(Module.class) - public static class RegisteredTestModule extends SimpleModule { - public RegisteredTestModule() { - super("RegisteredTestModule"); - setMixInAnnotation(JacksonIncompatible.class, JacksonIncompatibleMixin.class); - } - } - - /** A class which Jackson does not know how to serialize/deserialize. */ - public static class JacksonIncompatible { - private final String value; - public JacksonIncompatible(String value) { - this.value = value; - } - } - - /** A Jackson mixin used to add annotations to other classes. */ - @JsonDeserialize(using = JacksonIncompatibleDeserializer.class) - @JsonSerialize(using = JacksonIncompatibleSerializer.class) - public static final class JacksonIncompatibleMixin {} - - /** A Jackson deserializer for {@link JacksonIncompatible}. */ - public static class JacksonIncompatibleDeserializer extends - JsonDeserializer<JacksonIncompatible> { - - @Override - public JacksonIncompatible deserialize(JsonParser jsonParser, - DeserializationContext deserializationContext) throws IOException, JsonProcessingException { - return new JacksonIncompatible(jsonParser.readValueAs(String.class)); - } - } - - /** A Jackson serializer for {@link JacksonIncompatible}. */ - public static class JacksonIncompatibleSerializer extends JsonSerializer<JacksonIncompatible> { - - @Override - public void serialize(JacksonIncompatible jacksonIncompatible, JsonGenerator jsonGenerator, - SerializerProvider serializerProvider) throws IOException, JsonProcessingException { - jsonGenerator.writeString(jacksonIncompatible.value); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/core-construction-java/pom.xml ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/pom.xml b/runners/core-construction-java/pom.xml index b85b5f5..1a52914 100644 --- a/runners/core-construction-java/pom.xml +++ b/runners/core-construction-java/pom.xml @@ -65,6 +65,21 @@ </dependency> <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-annotations</artifactId> + </dependency> + + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + </dependency> + + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-core</artifactId> + </dependency> + + <dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SerializablePipelineOptions.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SerializablePipelineOptions.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SerializablePipelineOptions.java new file mode 100644 index 0000000..e697fb2 --- /dev/null +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SerializablePipelineOptions.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.core.construction; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.Serializable; +import org.apache.beam.sdk.io.FileSystems; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.util.common.ReflectHelpers; + +/** + * Holds a {@link PipelineOptions} in JSON serialized form and calls {@link + * FileSystems#setDefaultPipelineOptions(PipelineOptions)} on construction or on deserialization. + */ +public class SerializablePipelineOptions implements Serializable { + private static final ObjectMapper MAPPER = + new ObjectMapper() + .registerModules(ObjectMapper.findModules(ReflectHelpers.findClassLoader())); + + private final String serializedPipelineOptions; + private transient PipelineOptions options; + + public SerializablePipelineOptions(PipelineOptions options) { + this.serializedPipelineOptions = serializeToJson(options); + this.options = options; + FileSystems.setDefaultPipelineOptions(options); + } + + public PipelineOptions get() { + return options; + } + + private void readObject(ObjectInputStream is) throws IOException, ClassNotFoundException { + is.defaultReadObject(); + this.options = deserializeFromJson(serializedPipelineOptions); + // TODO https://issues.apache.org/jira/browse/BEAM-2712: remove this call. + FileSystems.setDefaultPipelineOptions(options); + } + + private static String serializeToJson(PipelineOptions options) { + try { + return MAPPER.writeValueAsString(options); + } catch (JsonProcessingException e) { + throw new IllegalArgumentException("Failed to serialize PipelineOptions", e); + } + } + + private static PipelineOptions deserializeFromJson(String options) { + try { + return MAPPER.readValue(options, PipelineOptions.class); + } catch (IOException e) { + throw new IllegalArgumentException("Failed to deserialize PipelineOptions", e); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SerializablePipelineOptionsTest.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SerializablePipelineOptionsTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SerializablePipelineOptionsTest.java new file mode 100644 index 0000000..cd470b2 --- /dev/null +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SerializablePipelineOptionsTest.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.core.construction; + +import static org.junit.Assert.assertEquals; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import org.apache.beam.sdk.options.Default; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.util.SerializableUtils; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link SerializablePipelineOptions}. */ +@RunWith(JUnit4.class) +public class SerializablePipelineOptionsTest { + /** Options for testing. */ + public interface MyOptions extends PipelineOptions { + String getFoo(); + + void setFoo(String foo); + + @JsonIgnore + @Default.String("not overridden") + String getIgnoredField(); + + void setIgnoredField(String value); + } + + @Test + public void testSerializationAndDeserialization() throws Exception { + PipelineOptions options = + PipelineOptionsFactory.fromArgs("--foo=testValue", "--ignoredField=overridden") + .as(MyOptions.class); + + SerializablePipelineOptions serializableOptions = new SerializablePipelineOptions(options); + assertEquals("testValue", serializableOptions.get().as(MyOptions.class).getFoo()); + assertEquals("overridden", serializableOptions.get().as(MyOptions.class).getIgnoredField()); + + SerializablePipelineOptions copy = SerializableUtils.clone(serializableOptions); + assertEquals("testValue", copy.get().as(MyOptions.class).getFoo()); + assertEquals("not overridden", copy.get().as(MyOptions.class).getIgnoredField()); + } + + @Test + public void testIndependence() throws Exception { + SerializablePipelineOptions first = + new SerializablePipelineOptions( + PipelineOptionsFactory.fromArgs("--foo=first").as(MyOptions.class)); + SerializablePipelineOptions firstCopy = SerializableUtils.clone(first); + SerializablePipelineOptions second = + new SerializablePipelineOptions( + PipelineOptionsFactory.fromArgs("--foo=second").as(MyOptions.class)); + SerializablePipelineOptions secondCopy = SerializableUtils.clone(second); + + assertEquals("first", first.get().as(MyOptions.class).getFoo()); + assertEquals("first", firstCopy.get().as(MyOptions.class).getFoo()); + assertEquals("second", second.get().as(MyOptions.class).getFoo()); + assertEquals("second", secondCopy.get().as(MyOptions.class).getFoo()); + + first.get().as(MyOptions.class).setFoo("new first"); + firstCopy.get().as(MyOptions.class).setFoo("new firstCopy"); + second.get().as(MyOptions.class).setFoo("new second"); + secondCopy.get().as(MyOptions.class).setFoo("new secondCopy"); + + assertEquals("new first", first.get().as(MyOptions.class).getFoo()); + assertEquals("new firstCopy", firstCopy.get().as(MyOptions.class).getFoo()); + assertEquals("new second", second.get().as(MyOptions.class).getFoo()); + assertEquals("new secondCopy", secondCopy.get().as(MyOptions.class).getFoo()); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/flink/pom.xml ---------------------------------------------------------------------- diff --git a/runners/flink/pom.xml b/runners/flink/pom.xml index c063a2d..06746fd 100644 --- a/runners/flink/pom.xml +++ b/runners/flink/pom.xml @@ -256,16 +256,6 @@ </dependency> <dependency> - <groupId>com.fasterxml.jackson.core</groupId> - <artifactId>jackson-core</artifactId> - </dependency> - - <dependency> - <groupId>com.fasterxml.jackson.core</groupId> - <artifactId>jackson-databind</artifactId> - </dependency> - - <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java index d8ed622..3048168 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java @@ -22,9 +22,9 @@ import java.util.List; import java.util.Map; import org.apache.beam.runners.core.DoFnRunner; import org.apache.beam.runners.core.DoFnRunners; +import org.apache.beam.runners.core.construction.SerializablePipelineOptions; import org.apache.beam.runners.flink.FlinkPipelineOptions; import org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate; -import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.join.RawUnionValue; @@ -50,7 +50,7 @@ import org.apache.flink.util.Collector; public class FlinkDoFnFunction<InputT, OutputT> extends RichMapPartitionFunction<WindowedValue<InputT>, WindowedValue<OutputT>> { - private final SerializedPipelineOptions serializedOptions; + private final SerializablePipelineOptions serializedOptions; private final DoFn<InputT, OutputT> doFn; private final String stepName; @@ -75,7 +75,7 @@ public class FlinkDoFnFunction<InputT, OutputT> this.doFn = doFn; this.stepName = stepName; this.sideInputs = sideInputs; - this.serializedOptions = new SerializedPipelineOptions(options); + this.serializedOptions = new SerializablePipelineOptions(options); this.windowingStrategy = windowingStrategy; this.outputMap = outputMap; this.mainOutputTag = mainOutputTag; @@ -101,7 +101,7 @@ public class FlinkDoFnFunction<InputT, OutputT> List<TupleTag<?>> additionalOutputTags = Lists.newArrayList(outputMap.keySet()); DoFnRunner<InputT, OutputT> doFnRunner = DoFnRunners.simpleRunner( - serializedOptions.getPipelineOptions(), doFn, + serializedOptions.get(), doFn, new FlinkSideInputReader(sideInputs, runtimeContext), outputManager, mainOutputTag, @@ -109,7 +109,7 @@ public class FlinkDoFnFunction<InputT, OutputT> new FlinkNoOpStepContext(), windowingStrategy); - if ((serializedOptions.getPipelineOptions().as(FlinkPipelineOptions.class)) + if ((serializedOptions.get().as(FlinkPipelineOptions.class)) .getEnableMetrics()) { doFnRunner = new DoFnRunnerWithMetricsUpdate<>(stepName, doFnRunner, getRuntimeContext()); } http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java index 13be913..c73dade 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java @@ -18,7 +18,7 @@ package org.apache.beam.runners.flink.translation.functions; import java.util.Map; -import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; +import org.apache.beam.runners.core.construction.SerializablePipelineOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.CombineFnBase; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; @@ -47,7 +47,7 @@ public class FlinkMergingNonShuffleReduceFunction< private final Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs; - private final SerializedPipelineOptions serializedOptions; + private final SerializablePipelineOptions serializedOptions; public FlinkMergingNonShuffleReduceFunction( CombineFnBase.GlobalCombineFn<InputT, AccumT, OutputT> combineFn, @@ -60,7 +60,7 @@ public class FlinkMergingNonShuffleReduceFunction< this.windowingStrategy = windowingStrategy; this.sideInputs = sideInputs; - this.serializedOptions = new SerializedPipelineOptions(pipelineOptions); + this.serializedOptions = new SerializablePipelineOptions(pipelineOptions); } @@ -69,7 +69,7 @@ public class FlinkMergingNonShuffleReduceFunction< Iterable<WindowedValue<KV<K, InputT>>> elements, Collector<WindowedValue<KV<K, OutputT>>> out) throws Exception { - PipelineOptions options = serializedOptions.getPipelineOptions(); + PipelineOptions options = serializedOptions.get(); FlinkSideInputReader sideInputReader = new FlinkSideInputReader(sideInputs, getRuntimeContext()); http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java index db12a49..49e821c 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java @@ -18,7 +18,7 @@ package org.apache.beam.runners.flink.translation.functions; import java.util.Map; -import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; +import org.apache.beam.runners.core.construction.SerializablePipelineOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.CombineFnBase; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; @@ -46,7 +46,7 @@ public class FlinkPartialReduceFunction<K, InputT, AccumT, W extends BoundedWind protected final WindowingStrategy<Object, W> windowingStrategy; - protected final SerializedPipelineOptions serializedOptions; + protected final SerializablePipelineOptions serializedOptions; protected final Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs; @@ -59,7 +59,7 @@ public class FlinkPartialReduceFunction<K, InputT, AccumT, W extends BoundedWind this.combineFn = combineFn; this.windowingStrategy = windowingStrategy; this.sideInputs = sideInputs; - this.serializedOptions = new SerializedPipelineOptions(pipelineOptions); + this.serializedOptions = new SerializablePipelineOptions(pipelineOptions); } @@ -68,7 +68,7 @@ public class FlinkPartialReduceFunction<K, InputT, AccumT, W extends BoundedWind Iterable<WindowedValue<KV<K, InputT>>> elements, Collector<WindowedValue<KV<K, AccumT>>> out) throws Exception { - PipelineOptions options = serializedOptions.getPipelineOptions(); + PipelineOptions options = serializedOptions.get(); FlinkSideInputReader sideInputReader = new FlinkSideInputReader(sideInputs, getRuntimeContext()); http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java index 53d71d8..6645b3a 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java @@ -18,7 +18,7 @@ package org.apache.beam.runners.flink.translation.functions; import java.util.Map; -import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; +import org.apache.beam.runners.core.construction.SerializablePipelineOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.CombineFnBase; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; @@ -48,7 +48,7 @@ public class FlinkReduceFunction<K, AccumT, OutputT, W extends BoundedWindow> protected final Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs; - protected final SerializedPipelineOptions serializedOptions; + protected final SerializablePipelineOptions serializedOptions; public FlinkReduceFunction( CombineFnBase.GlobalCombineFn<?, AccumT, OutputT> combineFn, @@ -61,7 +61,7 @@ public class FlinkReduceFunction<K, AccumT, OutputT, W extends BoundedWindow> this.windowingStrategy = windowingStrategy; this.sideInputs = sideInputs; - this.serializedOptions = new SerializedPipelineOptions(pipelineOptions); + this.serializedOptions = new SerializablePipelineOptions(pipelineOptions); } @@ -70,7 +70,7 @@ public class FlinkReduceFunction<K, AccumT, OutputT, W extends BoundedWindow> Iterable<WindowedValue<KV<K, AccumT>>> elements, Collector<WindowedValue<KV<K, OutputT>>> out) throws Exception { - PipelineOptions options = serializedOptions.getPipelineOptions(); + PipelineOptions options = serializedOptions.get(); FlinkSideInputReader sideInputReader = new FlinkSideInputReader(sideInputs, getRuntimeContext()); http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java index 11d4fee..412269c 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java @@ -31,9 +31,9 @@ import org.apache.beam.runners.core.StateInternals; import org.apache.beam.runners.core.StateNamespace; import org.apache.beam.runners.core.StateNamespaces; import org.apache.beam.runners.core.TimerInternals; +import org.apache.beam.runners.core.construction.SerializablePipelineOptions; import org.apache.beam.runners.flink.FlinkPipelineOptions; import org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate; -import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; @@ -61,7 +61,7 @@ public class FlinkStatefulDoFnFunction<K, V, OutputT> private String stepName; private final WindowingStrategy<?, ?> windowingStrategy; private final Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs; - private final SerializedPipelineOptions serializedOptions; + private final SerializablePipelineOptions serializedOptions; private final Map<TupleTag<?>, Integer> outputMap; private final TupleTag<OutputT> mainOutputTag; private transient DoFnInvoker doFnInvoker; @@ -79,7 +79,7 @@ public class FlinkStatefulDoFnFunction<K, V, OutputT> this.stepName = stepName; this.windowingStrategy = windowingStrategy; this.sideInputs = sideInputs; - this.serializedOptions = new SerializedPipelineOptions(pipelineOptions); + this.serializedOptions = new SerializablePipelineOptions(pipelineOptions); this.outputMap = outputMap; this.mainOutputTag = mainOutputTag; } @@ -118,7 +118,7 @@ public class FlinkStatefulDoFnFunction<K, V, OutputT> List<TupleTag<?>> additionalOutputTags = Lists.newArrayList(outputMap.keySet()); DoFnRunner<KV<K, V>, OutputT> doFnRunner = DoFnRunners.simpleRunner( - serializedOptions.getPipelineOptions(), dofn, + serializedOptions.get(), dofn, new FlinkSideInputReader(sideInputs, runtimeContext), outputManager, mainOutputTag, @@ -135,7 +135,7 @@ public class FlinkStatefulDoFnFunction<K, V, OutputT> }, windowingStrategy); - if ((serializedOptions.getPipelineOptions().as(FlinkPipelineOptions.class)) + if ((serializedOptions.get().as(FlinkPipelineOptions.class)) .getEnableMetrics()) { doFnRunner = new DoFnRunnerWithMetricsUpdate<>(stepName, doFnRunner, getRuntimeContext()); } http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java deleted file mode 100644 index 40b6dd6..0000000 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.flink.translation.utils; - -import static com.google.common.base.Preconditions.checkNotNull; - -import com.fasterxml.jackson.databind.Module; -import com.fasterxml.jackson.databind.ObjectMapper; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.Serializable; -import org.apache.beam.sdk.io.FileSystems; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.util.common.ReflectHelpers; - -/** - * Encapsulates the PipelineOptions in serialized form to ship them to the cluster. - */ -public class SerializedPipelineOptions implements Serializable { - - private final byte[] serializedOptions; - - /** Lazily initialized copy of deserialized options. */ - private transient PipelineOptions pipelineOptions; - - public SerializedPipelineOptions(PipelineOptions options) { - checkNotNull(options, "PipelineOptions must not be null."); - - try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) { - createMapper().writeValue(baos, options); - this.serializedOptions = baos.toByteArray(); - } catch (Exception e) { - throw new RuntimeException("Couldn't serialize PipelineOptions.", e); - } - - } - - public PipelineOptions getPipelineOptions() { - if (pipelineOptions == null) { - try { - pipelineOptions = createMapper().readValue(serializedOptions, PipelineOptions.class); - - FileSystems.setDefaultPipelineOptions(pipelineOptions); - } catch (IOException e) { - throw new RuntimeException("Couldn't deserialize the PipelineOptions.", e); - } - } - - return pipelineOptions; - } - - /** - * Use an {@link ObjectMapper} configured with any {@link Module}s in the class path allowing - * for user specified configuration injection into the ObjectMapper. This supports user custom - * types on {@link PipelineOptions}. - */ - private static ObjectMapper createMapper() { - return new ObjectMapper().registerModules( - ObjectMapper.findModules(ReflectHelpers.findClassLoader())); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java index 27e6912..3f9d601 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java @@ -19,9 +19,9 @@ package org.apache.beam.runners.flink.translation.wrappers; import java.io.IOException; import java.util.List; +import org.apache.beam.runners.core.construction.SerializablePipelineOptions; import org.apache.beam.runners.flink.metrics.FlinkMetricContainer; import org.apache.beam.runners.flink.metrics.ReaderInvocationUtil; -import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.Source; import org.apache.beam.sdk.options.PipelineOptions; @@ -50,7 +50,7 @@ public class SourceInputFormat<T> private final BoundedSource<T> initialSource; private transient PipelineOptions options; - private final SerializedPipelineOptions serializedOptions; + private final SerializablePipelineOptions serializedOptions; private transient BoundedSource.BoundedReader<T> reader; private boolean inputAvailable = false; @@ -61,12 +61,12 @@ public class SourceInputFormat<T> String stepName, BoundedSource<T> initialSource, PipelineOptions options) { this.stepName = stepName; this.initialSource = initialSource; - this.serializedOptions = new SerializedPipelineOptions(options); + this.serializedOptions = new SerializablePipelineOptions(options); } @Override public void configure(Configuration configuration) { - options = serializedOptions.getPipelineOptions(); + options = serializedOptions.get(); } @Override @@ -76,7 +76,7 @@ public class SourceInputFormat<T> readerInvoker = new ReaderInvocationUtil<>( stepName, - serializedOptions.getPipelineOptions(), + serializedOptions.get(), metricContainer); reader = ((BoundedSource<T>) sourceInputSplit.getSource()).createReader(options); http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java index 7995ea8..62de423 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java @@ -47,10 +47,10 @@ import org.apache.beam.runners.core.StateTags; import org.apache.beam.runners.core.StatefulDoFnRunner; import org.apache.beam.runners.core.TimerInternals; import org.apache.beam.runners.core.TimerInternals.TimerData; +import org.apache.beam.runners.core.construction.SerializablePipelineOptions; import org.apache.beam.runners.flink.FlinkPipelineOptions; import org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate; import org.apache.beam.runners.flink.translation.types.CoderTypeSerializer; -import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; import org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkBroadcastStateInternals; import org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkKeyGroupStateInternals; import org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkSplitStateInternals; @@ -106,7 +106,7 @@ public class DoFnOperator<InputT, OutputT> protected DoFn<InputT, OutputT> doFn; - protected final SerializedPipelineOptions serializedOptions; + protected final SerializablePipelineOptions serializedOptions; protected final TupleTag<OutputT> mainOutputTag; protected final List<TupleTag<?>> additionalOutputTags; @@ -174,7 +174,7 @@ public class DoFnOperator<InputT, OutputT> this.additionalOutputTags = additionalOutputTags; this.sideInputTagMapping = sideInputTagMapping; this.sideInputs = sideInputs; - this.serializedOptions = new SerializedPipelineOptions(options); + this.serializedOptions = new SerializablePipelineOptions(options); this.windowingStrategy = windowingStrategy; this.outputManagerFactory = outputManagerFactory; @@ -256,7 +256,7 @@ public class DoFnOperator<InputT, OutputT> org.apache.beam.runners.core.StepContext stepContext = createStepContext(); doFnRunner = DoFnRunners.simpleRunner( - serializedOptions.getPipelineOptions(), + serializedOptions.get(), doFn, sideInputReader, outputManager, @@ -301,7 +301,7 @@ public class DoFnOperator<InputT, OutputT> stateCleaner); } - if ((serializedOptions.getPipelineOptions().as(FlinkPipelineOptions.class)) + if ((serializedOptions.get().as(FlinkPipelineOptions.class)) .getEnableMetrics()) { doFnRunner = new DoFnRunnerWithMetricsUpdate<>(stepName, doFnRunner, getRuntimeContext()); } http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java index 2f095d4..be758a6 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java @@ -115,7 +115,7 @@ public class SplittableDoFnOperator< ((ProcessFn) doFn).setProcessElementInvoker( new OutputAndTimeBoundedSplittableProcessElementInvoker<>( doFn, - serializedOptions.getPipelineOptions(), + serializedOptions.get(), new OutputWindowedValue<OutputT>() { @Override public void outputWindowedValue( http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java index 6d75688..5ddc46f 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java @@ -20,9 +20,9 @@ package org.apache.beam.runners.flink.translation.wrappers.streaming.io; import com.google.common.annotations.VisibleForTesting; import java.util.ArrayList; import java.util.List; +import org.apache.beam.runners.core.construction.SerializablePipelineOptions; import org.apache.beam.runners.flink.metrics.FlinkMetricContainer; import org.apache.beam.runners.flink.metrics.ReaderInvocationUtil; -import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; @@ -48,7 +48,7 @@ public class BoundedSourceWrapper<OutputT> /** * Keep the options so that we can initialize the readers. */ - private final SerializedPipelineOptions serializedOptions; + private final SerializablePipelineOptions serializedOptions; /** * The split sources. We split them in the constructor to ensure that all parallel @@ -74,7 +74,7 @@ public class BoundedSourceWrapper<OutputT> BoundedSource<OutputT> source, int parallelism) throws Exception { this.stepName = stepName; - this.serializedOptions = new SerializedPipelineOptions(pipelineOptions); + this.serializedOptions = new SerializablePipelineOptions(pipelineOptions); long desiredBundleSize = source.getEstimatedSizeBytes(pipelineOptions) / parallelism; @@ -109,13 +109,13 @@ public class BoundedSourceWrapper<OutputT> ReaderInvocationUtil<OutputT, BoundedSource.BoundedReader<OutputT>> readerInvoker = new ReaderInvocationUtil<>( stepName, - serializedOptions.getPipelineOptions(), + serializedOptions.get(), metricContainer); readers = new ArrayList<>(); // initialize readers from scratch for (BoundedSource<OutputT> source : localSources) { - readers.add(source.createReader(serializedOptions.getPipelineOptions())); + readers.add(source.createReader(serializedOptions.get())); } if (readers.size() == 1) { http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java index e75072a..817dd74 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java @@ -22,10 +22,10 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; +import org.apache.beam.runners.core.construction.SerializablePipelineOptions; import org.apache.beam.runners.flink.metrics.FlinkMetricContainer; import org.apache.beam.runners.flink.metrics.ReaderInvocationUtil; import org.apache.beam.runners.flink.translation.types.CoderTypeInformation; -import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.SerializableCoder; @@ -72,7 +72,7 @@ public class UnboundedSourceWrapper< /** * Keep the options so that we can initialize the localReaders. */ - private final SerializedPipelineOptions serializedOptions; + private final SerializablePipelineOptions serializedOptions; /** * For snapshot and restore. @@ -141,7 +141,7 @@ public class UnboundedSourceWrapper< UnboundedSource<OutputT, CheckpointMarkT> source, int parallelism) throws Exception { this.stepName = stepName; - this.serializedOptions = new SerializedPipelineOptions(pipelineOptions); + this.serializedOptions = new SerializablePipelineOptions(pipelineOptions); if (source.requiresDeduping()) { LOG.warn("Source {} requires deduping but Flink runner doesn't support this yet.", source); @@ -189,7 +189,7 @@ public class UnboundedSourceWrapper< stateForCheckpoint.get()) { localSplitSources.add(restored.getKey()); localReaders.add(restored.getKey().createReader( - serializedOptions.getPipelineOptions(), restored.getValue())); + serializedOptions.get(), restored.getValue())); } } else { // initialize localReaders and localSources from scratch @@ -198,7 +198,7 @@ public class UnboundedSourceWrapper< UnboundedSource<OutputT, CheckpointMarkT> source = splitSources.get(i); UnboundedSource.UnboundedReader<OutputT> reader = - source.createReader(serializedOptions.getPipelineOptions(), null); + source.createReader(serializedOptions.get(), null); localSplitSources.add(source); localReaders.add(reader); } @@ -221,7 +221,7 @@ public class UnboundedSourceWrapper< ReaderInvocationUtil<OutputT, UnboundedSource.UnboundedReader<OutputT>> readerInvoker = new ReaderInvocationUtil<>( stepName, - serializedOptions.getPipelineOptions(), + serializedOptions.get(), metricContainer); if (localReaders.size() == 0) { http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java index d0281ec..eb06026 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java @@ -17,32 +17,8 @@ */ package org.apache.beam.runners.flink; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; - -import com.fasterxml.jackson.core.JsonGenerator; -import com.fasterxml.jackson.core.JsonParser; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.DeserializationContext; -import com.fasterxml.jackson.databind.JsonDeserializer; -import com.fasterxml.jackson.databind.JsonSerializer; -import com.fasterxml.jackson.databind.Module; -import com.fasterxml.jackson.databind.SerializerProvider; -import com.fasterxml.jackson.databind.annotation.JsonDeserialize; -import com.fasterxml.jackson.databind.annotation.JsonSerialize; -import com.fasterxml.jackson.databind.module.SimpleModule; -import com.google.auto.service.AutoService; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; import java.util.Collections; import java.util.HashMap; -import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; import org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.options.Default; @@ -60,12 +36,10 @@ import org.apache.commons.lang3.SerializationUtils; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.joda.time.Instant; import org.junit.Assert; -import org.junit.BeforeClass; import org.junit.Test; /** @@ -73,9 +47,7 @@ import org.junit.Test; */ public class PipelineOptionsTest { - /** - * Pipeline options. - */ + /** Pipeline options. */ public interface MyOptions extends FlinkPipelineOptions { @Description("Bla bla bla") @Default.String("Hello") @@ -83,60 +55,12 @@ public class PipelineOptionsTest { void setTestOption(String value); } - private static MyOptions options; - private static SerializedPipelineOptions serializedOptions; - - private static final String[] args = new String[]{"--testOption=nothing"}; - - @BeforeClass - public static void beforeTest() { - options = PipelineOptionsFactory.fromArgs(args).as(MyOptions.class); - serializedOptions = new SerializedPipelineOptions(options); - } - - @Test - public void testDeserialization() { - MyOptions deserializedOptions = serializedOptions.getPipelineOptions().as(MyOptions.class); - assertEquals("nothing", deserializedOptions.getTestOption()); - } - - @Test - public void testIgnoredFieldSerialization() { - FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class); - options.setStateBackend(new MemoryStateBackend()); - - FlinkPipelineOptions deserialized = - new SerializedPipelineOptions(options).getPipelineOptions().as(FlinkPipelineOptions.class); - - assertNull(deserialized.getStateBackend()); - } - - @Test - public void testEnableMetrics() { - FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class); - options.setEnableMetrics(false); - assertFalse(options.getEnableMetrics()); - } - - @Test - public void testCaching() { - PipelineOptions deserializedOptions = - serializedOptions.getPipelineOptions().as(PipelineOptions.class); - - assertNotNull(deserializedOptions); - assertTrue(deserializedOptions == serializedOptions.getPipelineOptions()); - assertTrue(deserializedOptions == serializedOptions.getPipelineOptions()); - assertTrue(deserializedOptions == serializedOptions.getPipelineOptions()); - } - - @Test(expected = Exception.class) - public void testNonNull() { - new SerializedPipelineOptions(null); - } + private static MyOptions options = + PipelineOptionsFactory.fromArgs("--testOption=nothing").as(MyOptions.class); @Test(expected = Exception.class) public void parDoBaseClassPipelineOptionsNullTest() { - DoFnOperator<String, String> doFnOperator = new DoFnOperator<>( + new DoFnOperator<>( new TestDoFn(), "stepName", WindowedValue.getValueOnlyCoder(StringUtf8Coder.of()), @@ -196,18 +120,7 @@ public class PipelineOptionsTest { } - @Test - public void testExternalizedCheckpointsConfigs() { - String[] args = new String[] { "--externalizedCheckpointsEnabled=true", - "--retainExternalizedCheckpointsOnCancellation=false" }; - final FlinkPipelineOptions options = PipelineOptionsFactory.fromArgs(args) - .as(FlinkPipelineOptions.class); - assertEquals(options.isExternalizedCheckpointsEnabled(), true); - assertEquals(options.getRetainExternalizedCheckpointsOnCancellation(), false); - } - private static class TestDoFn extends DoFn<String, String> { - @ProcessElement public void processElement(ProcessContext c) throws Exception { Assert.assertNotNull(c.getPipelineOptions()); @@ -216,74 +129,4 @@ public class PipelineOptionsTest { c.getPipelineOptions().as(MyOptions.class).getTestOption()); } } - - /** PipelineOptions used to test auto registration of Jackson modules. */ - public interface JacksonIncompatibleOptions extends PipelineOptions { - JacksonIncompatible getJacksonIncompatible(); - void setJacksonIncompatible(JacksonIncompatible value); - } - - /** A Jackson {@link Module} to test auto-registration of modules. */ - @AutoService(Module.class) - public static class RegisteredTestModule extends SimpleModule { - public RegisteredTestModule() { - super("RegisteredTestModule"); - setMixInAnnotation(JacksonIncompatible.class, JacksonIncompatibleMixin.class); - } - } - - /** A class which Jackson does not know how to serialize/deserialize. */ - public static class JacksonIncompatible { - private final String value; - public JacksonIncompatible(String value) { - this.value = value; - } - } - - /** A Jackson mixin used to add annotations to other classes. */ - @JsonDeserialize(using = JacksonIncompatibleDeserializer.class) - @JsonSerialize(using = JacksonIncompatibleSerializer.class) - public static final class JacksonIncompatibleMixin {} - - /** A Jackson deserializer for {@link JacksonIncompatible}. */ - public static class JacksonIncompatibleDeserializer extends - JsonDeserializer<JacksonIncompatible> { - - @Override - public JacksonIncompatible deserialize(JsonParser jsonParser, - DeserializationContext deserializationContext) throws IOException, JsonProcessingException { - return new JacksonIncompatible(jsonParser.readValueAs(String.class)); - } - } - - /** A Jackson serializer for {@link JacksonIncompatible}. */ - public static class JacksonIncompatibleSerializer extends JsonSerializer<JacksonIncompatible> { - - @Override - public void serialize(JacksonIncompatible jacksonIncompatible, JsonGenerator jsonGenerator, - SerializerProvider serializerProvider) throws IOException, JsonProcessingException { - jsonGenerator.writeString(jacksonIncompatible.value); - } - } - - @Test - public void testSerializingPipelineOptionsWithCustomUserType() throws Exception { - String expectedValue = "testValue"; - PipelineOptions options = PipelineOptionsFactory - .fromArgs("--jacksonIncompatible=\"" + expectedValue + "\"") - .as(JacksonIncompatibleOptions.class); - SerializedPipelineOptions context = new SerializedPipelineOptions(options); - - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - try (ObjectOutputStream outputStream = new ObjectOutputStream(baos)) { - outputStream.writeObject(context); - } - try (ObjectInputStream inputStream = - new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray()))) { - SerializedPipelineOptions copy = (SerializedPipelineOptions) inputStream.readObject(); - assertEquals(expectedValue, - copy.getPipelineOptions().as(JacksonIncompatibleOptions.class) - .getJacksonIncompatible().value); - } - } } http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/spark/pom.xml ---------------------------------------------------------------------- diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml index e823060..b2e7fe4 100644 --- a/runners/spark/pom.xml +++ b/runners/spark/pom.xml @@ -35,7 +35,6 @@ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <kafka.version>0.9.0.1</kafka.version> - <jackson.version>2.4.4</jackson.version> <dropwizard.metrics.version>3.1.2</dropwizard.metrics.version> </properties> @@ -184,18 +183,7 @@ </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> - <artifactId>jackson-core</artifactId> - <version>${jackson.version}</version> - </dependency> - <dependency> - <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-annotations</artifactId> - <version>${jackson.version}</version> - </dependency> - <dependency> - <groupId>com.fasterxml.jackson.core</groupId> - <artifactId>jackson-databind</artifactId> - <version>${jackson.version}</version> </dependency> <dependency> <groupId>org.apache.avro</groupId> http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java index 27f2ec8..a9f2c44 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java @@ -19,18 +19,11 @@ package org.apache.beam.runners.spark.aggregators; import com.google.common.base.Function; -import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; import java.io.Serializable; import java.util.Map; import java.util.TreeMap; -import org.apache.beam.runners.spark.translation.SparkRuntimeContext; -import org.apache.beam.sdk.coders.CannotProvideCoderException; -import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.transforms.Combine; /** @@ -52,17 +45,6 @@ public class NamedAggregators implements Serializable { } /** - * Constructs a new named aggregators instance that contains a mapping from the specified - * `named` to the associated initial state. - * - * @param name Name of aggregator. - * @param state Associated State. - */ - public NamedAggregators(String name, State<?, ?, ?> state) { - this.mNamedAggregators.put(name, state); - } - - /** * @param name Name of aggregator to retrieve. * @param typeClass Type class to cast the value to. * @param <T> Type to be returned. @@ -152,79 +134,4 @@ public class NamedAggregators implements Serializable { Combine.CombineFn<InputT, InterT, OutputT> getCombineFn(); } - /** - * @param <InputT> Input data type - * @param <InterT> Intermediate data type (useful for averages) - * @param <OutputT> Output data type - */ - public static class CombineFunctionState<InputT, InterT, OutputT> - implements State<InputT, InterT, OutputT> { - - private Combine.CombineFn<InputT, InterT, OutputT> combineFn; - private Coder<InputT> inCoder; - private SparkRuntimeContext ctxt; - private transient InterT state; - - public CombineFunctionState( - Combine.CombineFn<InputT, InterT, OutputT> combineFn, - Coder<InputT> inCoder, - SparkRuntimeContext ctxt) { - this.combineFn = combineFn; - this.inCoder = inCoder; - this.ctxt = ctxt; - this.state = combineFn.createAccumulator(); - } - - @Override - public void update(InputT element) { - combineFn.addInput(state, element); - } - - @Override - public State<InputT, InterT, OutputT> merge(State<InputT, InterT, OutputT> other) { - this.state = combineFn.mergeAccumulators(ImmutableList.of(current(), other.current())); - return this; - } - - @Override - public InterT current() { - return state; - } - - @Override - public OutputT render() { - return combineFn.extractOutput(state); - } - - @Override - public Combine.CombineFn<InputT, InterT, OutputT> getCombineFn() { - return combineFn; - } - - private void writeObject(ObjectOutputStream oos) throws IOException { - oos.writeObject(ctxt); - oos.writeObject(combineFn); - oos.writeObject(inCoder); - try { - combineFn.getAccumulatorCoder(ctxt.getCoderRegistry(), inCoder) - .encode(state, oos); - } catch (CannotProvideCoderException e) { - throw new IllegalStateException("Could not determine coder for accumulator", e); - } - } - - @SuppressWarnings("unchecked") - private void readObject(ObjectInputStream ois) throws IOException, ClassNotFoundException { - ctxt = (SparkRuntimeContext) ois.readObject(); - combineFn = (Combine.CombineFn<InputT, InterT, OutputT>) ois.readObject(); - inCoder = (Coder<InputT>) ois.readObject(); - try { - state = combineFn.getAccumulatorCoder(ctxt.getCoderRegistry(), inCoder) - .decode(ois); - } catch (CannotProvideCoderException e) { - throw new IllegalStateException("Could not determine coder for accumulator", e); - } - } - } - } http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java index 20aca5f..b7000b4 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java @@ -20,8 +20,8 @@ package org.apache.beam.runners.spark.io; import static com.google.common.base.Preconditions.checkArgument; +import org.apache.beam.runners.core.construction.SerializablePipelineOptions; import org.apache.beam.runners.spark.SparkPipelineOptions; -import org.apache.beam.runners.spark.translation.SparkRuntimeContext; import org.apache.beam.sdk.io.Source; import org.apache.beam.sdk.io.UnboundedSource; import org.apache.spark.api.java.JavaSparkContext$; @@ -58,7 +58,7 @@ class SourceDStream<T, CheckpointMarkT extends UnboundedSource.CheckpointMark> private static final Logger LOG = LoggerFactory.getLogger(SourceDStream.class); private final UnboundedSource<T, CheckpointMarkT> unboundedSource; - private final SparkRuntimeContext runtimeContext; + private final SerializablePipelineOptions options; private final Duration boundReadDuration; // Reader cache interval to expire readers if they haven't been accessed in the last microbatch. // The reason we expire readers is that upon executor death/addition source split ownership can be @@ -81,20 +81,20 @@ class SourceDStream<T, CheckpointMarkT extends UnboundedSource.CheckpointMark> SourceDStream( StreamingContext ssc, UnboundedSource<T, CheckpointMarkT> unboundedSource, - SparkRuntimeContext runtimeContext, + SerializablePipelineOptions options, Long boundMaxRecords) { super(ssc, JavaSparkContext$.MODULE$.<scala.Tuple2<Source<T>, CheckpointMarkT>>fakeClassTag()); this.unboundedSource = unboundedSource; - this.runtimeContext = runtimeContext; + this.options = options; - SparkPipelineOptions options = runtimeContext.getPipelineOptions().as( + SparkPipelineOptions sparkOptions = options.get().as( SparkPipelineOptions.class); // Reader cache expiration interval. 50% of batch interval is added to accommodate latency. - this.readerCacheInterval = 1.5 * options.getBatchIntervalMillis(); + this.readerCacheInterval = 1.5 * sparkOptions.getBatchIntervalMillis(); - this.boundReadDuration = boundReadDuration(options.getReadTimePercentage(), - options.getMinReadTimeMillis()); + this.boundReadDuration = boundReadDuration(sparkOptions.getReadTimePercentage(), + sparkOptions.getMinReadTimeMillis()); // set initial parallelism once. this.initialParallelism = ssc().sparkContext().defaultParallelism(); checkArgument(this.initialParallelism > 0, "Number of partitions must be greater than zero."); @@ -104,7 +104,7 @@ class SourceDStream<T, CheckpointMarkT extends UnboundedSource.CheckpointMark> try { this.numPartitions = createMicrobatchSource() - .split(options) + .split(sparkOptions) .size(); } catch (Exception e) { throw new RuntimeException(e); @@ -116,7 +116,7 @@ class SourceDStream<T, CheckpointMarkT extends UnboundedSource.CheckpointMark> RDD<scala.Tuple2<Source<T>, CheckpointMarkT>> rdd = new SourceRDD.Unbounded<>( ssc().sparkContext(), - runtimeContext, + options, createMicrobatchSource(), numPartitions); return scala.Option.apply(rdd); http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java index 01cc176..a225e0f 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java @@ -28,9 +28,9 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.NoSuchElementException; +import org.apache.beam.runners.core.construction.SerializablePipelineOptions; import org.apache.beam.runners.core.metrics.MetricsContainerStepMap; import org.apache.beam.runners.spark.metrics.MetricsAccumulator; -import org.apache.beam.runners.spark.translation.SparkRuntimeContext; import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.Source; import org.apache.beam.sdk.io.UnboundedSource; @@ -66,7 +66,7 @@ public class SourceRDD { private static final Logger LOG = LoggerFactory.getLogger(SourceRDD.Bounded.class); private final BoundedSource<T> source; - private final SparkRuntimeContext runtimeContext; + private final SerializablePipelineOptions options; private final int numPartitions; private final String stepName; private final Accumulator<MetricsContainerStepMap> metricsAccum; @@ -79,11 +79,11 @@ public class SourceRDD { public Bounded( SparkContext sc, BoundedSource<T> source, - SparkRuntimeContext runtimeContext, + SerializablePipelineOptions options, String stepName) { super(sc, NIL, JavaSparkContext$.MODULE$.<WindowedValue<T>>fakeClassTag()); this.source = source; - this.runtimeContext = runtimeContext; + this.options = options; // the input parallelism is determined by Spark's scheduler backend. // when running on YARN/SparkDeploy it's the result of max(totalCores, 2). // when running on Mesos it's 8. @@ -103,14 +103,14 @@ public class SourceRDD { long desiredSizeBytes = DEFAULT_BUNDLE_SIZE; try { desiredSizeBytes = source.getEstimatedSizeBytes( - runtimeContext.getPipelineOptions()) / numPartitions; + options.get()) / numPartitions; } catch (Exception e) { LOG.warn("Failed to get estimated bundle size for source {}, using default bundle " + "size of {} bytes.", source, DEFAULT_BUNDLE_SIZE); } try { List<? extends Source<T>> partitionedSources = source.split(desiredSizeBytes, - runtimeContext.getPipelineOptions()); + options.get()); Partition[] partitions = new SourcePartition[partitionedSources.size()]; for (int i = 0; i < partitionedSources.size(); i++) { partitions[i] = new SourcePartition<>(id(), i, partitionedSources.get(i)); @@ -125,7 +125,7 @@ public class SourceRDD { private BoundedSource.BoundedReader<T> createReader(SourcePartition<T> partition) { try { return ((BoundedSource<T>) partition.source).createReader( - runtimeContext.getPipelineOptions()); + options.get()); } catch (IOException e) { throw new RuntimeException("Failed to create reader from a BoundedSource.", e); } @@ -293,7 +293,7 @@ public class SourceRDD { UnboundedSource.CheckpointMark> extends RDD<scala.Tuple2<Source<T>, CheckpointMarkT>> { private final MicrobatchSource<T, CheckpointMarkT> microbatchSource; - private final SparkRuntimeContext runtimeContext; + private final SerializablePipelineOptions options; private final Partitioner partitioner; // to satisfy Scala API. @@ -302,12 +302,12 @@ public class SourceRDD { .asScalaBuffer(Collections.<Dependency<?>>emptyList()).toList(); public Unbounded(SparkContext sc, - SparkRuntimeContext runtimeContext, + SerializablePipelineOptions options, MicrobatchSource<T, CheckpointMarkT> microbatchSource, int initialNumPartitions) { super(sc, NIL, JavaSparkContext$.MODULE$.<scala.Tuple2<Source<T>, CheckpointMarkT>>fakeClassTag()); - this.runtimeContext = runtimeContext; + this.options = options; this.microbatchSource = microbatchSource; this.partitioner = new HashPartitioner(initialNumPartitions); } @@ -316,7 +316,7 @@ public class SourceRDD { public Partition[] getPartitions() { try { final List<? extends Source<T>> partitionedSources = - microbatchSource.split(runtimeContext.getPipelineOptions()); + microbatchSource.split(options.get()); final Partition[] partitions = new CheckpointableSourcePartition[partitionedSources.size()]; for (int i = 0; i < partitionedSources.size(); i++) { partitions[i] = http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java index 7106c73..b31aa9f 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java @@ -22,12 +22,12 @@ import java.io.Closeable; import java.io.IOException; import java.io.Serializable; import java.util.Collections; +import org.apache.beam.runners.core.construction.SerializablePipelineOptions; import org.apache.beam.runners.core.metrics.MetricsContainerStepMap; import org.apache.beam.runners.spark.SparkPipelineOptions; import org.apache.beam.runners.spark.coders.CoderHelpers; import org.apache.beam.runners.spark.metrics.MetricsAccumulator; import org.apache.beam.runners.spark.stateful.StateSpecFunctions; -import org.apache.beam.runners.spark.translation.SparkRuntimeContext; import org.apache.beam.runners.spark.translation.streaming.UnboundedDataset; import org.apache.beam.runners.spark.util.GlobalWatermarkHolder; import org.apache.beam.runners.spark.util.GlobalWatermarkHolder.SparkWatermarks; @@ -80,11 +80,11 @@ public class SparkUnboundedSource { public static <T, CheckpointMarkT extends CheckpointMark> UnboundedDataset<T> read( JavaStreamingContext jssc, - SparkRuntimeContext rc, + SerializablePipelineOptions rc, UnboundedSource<T, CheckpointMarkT> source, String stepName) { - SparkPipelineOptions options = rc.getPipelineOptions().as(SparkPipelineOptions.class); + SparkPipelineOptions options = rc.get().as(SparkPipelineOptions.class); Long maxRecordsPerBatch = options.getMaxRecordsPerBatch(); SourceDStream<T, CheckpointMarkT> sourceDStream = new SourceDStream<>(jssc.ssc(), source, rc, maxRecordsPerBatch);