This is an automated email from the ASF dual-hosted git repository.
lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 3839281 [BEAM-3344] Changes for serialize/deserialize PipelineOptions
via jackson in DirectRunner (#8302)
3839281 is described below
commit 3839281c0ecdc98ea8571db6f49f1e818ed7a696
Author: sudhan499 <[email protected]>
AuthorDate: Wed May 1 02:08:06 2019 +0530
[BEAM-3344] Changes for serialize/deserialize PipelineOptions via jackson
in DirectRunner (#8302)
* [BEAM-3344] CHanges for serialize/deserialize PipelineOptions via jackson
in DirectRunner
* [BEAM-3344] Serialize/deserialize PipelineOptions via Jackson in
DirectRunner
* BEAM-3344] Changes for serialize/deserialize PipelineOptions via jackson
in DirectRunner
* [BEAM-3344] Minor fix Marking interface as public
* [BEAM-3344] Fixing Checkstyle issue.
* Update
runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
Co-Authored-By: sudhan499 <[email protected]>
* Update
runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
Co-Authored-By: sudhan499 <[email protected]>
* Update
runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
Co-Authored-By: sudhan499 <[email protected]>
* Update
runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
Co-Authored-By: sudhan499 <[email protected]>
* Update
runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
Co-Authored-By: sudhan499 <[email protected]>
* Update
runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
Co-Authored-By: sudhan499 <[email protected]>
* Update
runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
Co-Authored-By: sudhan499 <[email protected]>
* Update
runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
Co-Authored-By: sudhan499 <[email protected]>
* Update
runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
Co-Authored-By: sudhan499 <[email protected]>
* Update
runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
Co-Authored-By: sudhan499 <[email protected]>
* Update
runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
Co-Authored-By: sudhan499 <[email protected]>
* Update
runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
Co-Authored-By: sudhan499 <[email protected]>
* Update
runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
Co-Authored-By: sudhan499 <[email protected]>
* Update
runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
Co-Authored-By: sudhan499 <[email protected]>
* [BEAM-3344] Applying spotless java changes
* Migrate PipelineOptions serialization/deserialization till when run is
invoked to allow
for the correct binding of RuntimeValueProvider to occur.
---
.../apache/beam/runners/direct/DirectRunner.java | 18 +++++++-
.../beam/runners/direct/DirectRunnerTest.java | 51 ++++++++++++++++++++++
2 files changed, 68 insertions(+), 1 deletion(-)
diff --git
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index fbc1dd8..757ac3b 100644
---
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -17,6 +17,8 @@
*/
package org.apache.beam.runners.direct;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
@@ -41,6 +43,7 @@ import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.runners.PTransformOverride;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.util.UserCodeException;
+import org.apache.beam.sdk.util.common.ReflectHelpers;
import org.apache.beam.sdk.values.PCollection;
import
org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Supplier;
@@ -127,9 +130,12 @@ public class DirectRunner extends
PipelineRunner<DirectPipelineResult> {
}
////////////////////////////////////////////////////////////////////////////////////////////////
- private final DirectOptions options;
+ private DirectOptions options;
private final Set<Enforcement> enabledEnforcements;
private Supplier<Clock> clockSupplier = new NanosOffsetClockSupplier();
+ private static final ObjectMapper MAPPER =
+ new ObjectMapper()
+
.registerModules(ObjectMapper.findModules(ReflectHelpers.findClassLoader()));
/** Construct a {@link DirectRunner} from the provided options. */
public static DirectRunner fromOptions(PipelineOptions options) {
@@ -151,6 +157,16 @@ public class DirectRunner extends
PipelineRunner<DirectPipelineResult> {
@Override
public DirectPipelineResult run(Pipeline pipeline) {
+ try {
+ options =
+ MAPPER
+ .readValue(MAPPER.writeValueAsBytes(options),
PipelineOptions.class)
+ .as(DirectOptions.class);
+ } catch (IOException e) {
+ throw new IllegalArgumentException(
+ "PipelineOptions specified failed to serialize to JSON.", e);
+ }
+
pipeline.replaceAll(defaultTransformOverrides());
MetricsEnvironment.setMetricsSupported(true);
try {
diff --git
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
index d3ede31..2d951ef 100644
---
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
+++
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
@@ -22,9 +22,11 @@ import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.isA;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
+import com.fasterxml.jackson.annotation.JsonIgnore;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -55,6 +57,7 @@ import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.CountingSource;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
@@ -581,6 +584,54 @@ public class DirectRunnerTest implements Serializable {
p.run();
}
+ /**
+ * Tests that {@link DirectRunner#fromOptions(PipelineOptions)} drops {@link
PipelineOptions}
+ * marked with {@link JsonIgnore} fields.
+ */
+ @Test
+ public void testFromOptionsIfIgnoredFieldsGettingDropped() {
+ TestSerializationOfOptions options =
+ PipelineOptionsFactory.fromArgs(
+ "--foo=testValue", "--ignoredField=overridden",
"--runner=DirectRunner")
+ .as(TestSerializationOfOptions.class);
+
+ assertEquals("testValue", options.getFoo());
+ assertEquals("overridden", options.getIgnoredField());
+ Pipeline p = Pipeline.create(options);
+ PCollection<Integer> pc =
+ p.apply(Create.of("1"))
+ .apply(
+ ParDo.of(
+ new DoFn<String, Integer>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ TestSerializationOfOptions options =
+
c.getPipelineOptions().as(TestSerializationOfOptions.class);
+ assertEquals("testValue", options.getFoo());
+ assertEquals("not overridden",
options.getIgnoredField());
+ c.output(Integer.parseInt(c.element()));
+ }
+ }));
+ PAssert.that(pc).containsInAnyOrder(1);
+ p.run();
+ }
+
+ /**
+ * Options for testing if {@link DirectRunner} drops {@link PipelineOptions}
marked with {@link
+ * JsonIgnore} fields.
+ */
+ public interface TestSerializationOfOptions extends PipelineOptions {
+ String getFoo();
+
+ void setFoo(String foo);
+
+ @JsonIgnore
+ @Default.String("not overridden")
+ String getIgnoredField();
+
+ void setIgnoredField(String value);
+ }
+
private static class LongNoDecodeCoder extends AtomicCoder<Long> {
@Override
public void encode(Long value, OutputStream outStream) throws IOException
{}