This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 3839281  [BEAM-3344] Changes for serialize/deserialize PipelineOptions 
via jackson in DirectRunner (#8302)
3839281 is described below

commit 3839281c0ecdc98ea8571db6f49f1e818ed7a696
Author: sudhan499 <[email protected]>
AuthorDate: Wed May 1 02:08:06 2019 +0530

    [BEAM-3344] Changes for serialize/deserialize PipelineOptions via jackson 
in DirectRunner (#8302)
    
    * [BEAM-3344] CHanges for serialize/deserialize PipelineOptions via jackson 
in DirectRunner
    
    * [BEAM-3344] Serialize/deserialize PipelineOptions via Jackson in 
DirectRunner
    
    * BEAM-3344] Changes for serialize/deserialize PipelineOptions via jackson 
in DirectRunner
    
    * [BEAM-3344] Minor fix Marking interface as public
    
    * [BEAM-3344] Fixing Checkstyle issue.
    
    * Update 
runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
    
    Co-Authored-By: sudhan499 <[email protected]>
    
    * Update 
runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
    
    Co-Authored-By: sudhan499 <[email protected]>
    
    * Update 
runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
    
    Co-Authored-By: sudhan499 <[email protected]>
    
    * Update 
runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
    
    Co-Authored-By: sudhan499 <[email protected]>
    
    * Update 
runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
    
    Co-Authored-By: sudhan499 <[email protected]>
    
    * Update 
runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
    
    Co-Authored-By: sudhan499 <[email protected]>
    
    * Update 
runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
    
    Co-Authored-By: sudhan499 <[email protected]>
    
    * Update 
runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
    
    Co-Authored-By: sudhan499 <[email protected]>
    
    * Update 
runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
    
    Co-Authored-By: sudhan499 <[email protected]>
    
    * Update 
runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
    
    Co-Authored-By: sudhan499 <[email protected]>
    
    * Update 
runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
    
    Co-Authored-By: sudhan499 <[email protected]>
    
    * Update 
runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
    
    Co-Authored-By: sudhan499 <[email protected]>
    
    * Update 
runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
    
    Co-Authored-By: sudhan499 <[email protected]>
    
    * Update 
runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
    
    Co-Authored-By: sudhan499 <[email protected]>
    
    * [BEAM-3344] Applying spotless java changes
    
    * Migrate PipelineOptions serialization/deserialization till when run is 
invoked to allow
    for the correct binding of RuntimeValueProvider to occur.
---
 .../apache/beam/runners/direct/DirectRunner.java   | 18 +++++++-
 .../beam/runners/direct/DirectRunnerTest.java      | 51 ++++++++++++++++++++++
 2 files changed, 68 insertions(+), 1 deletion(-)

diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index fbc1dd8..757ac3b 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -17,6 +17,8 @@
  */
 package org.apache.beam.runners.direct;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
@@ -41,6 +43,7 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.runners.PTransformOverride;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.util.UserCodeException;
+import org.apache.beam.sdk.util.common.ReflectHelpers;
 import org.apache.beam.sdk.values.PCollection;
 import 
org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Supplier;
@@ -127,9 +130,12 @@ public class DirectRunner extends 
PipelineRunner<DirectPipelineResult> {
   }
 
   
////////////////////////////////////////////////////////////////////////////////////////////////
-  private final DirectOptions options;
+  private DirectOptions options;
   private final Set<Enforcement> enabledEnforcements;
   private Supplier<Clock> clockSupplier = new NanosOffsetClockSupplier();
+  private static final ObjectMapper MAPPER =
+      new ObjectMapper()
+          
.registerModules(ObjectMapper.findModules(ReflectHelpers.findClassLoader()));
 
   /** Construct a {@link DirectRunner} from the provided options. */
   public static DirectRunner fromOptions(PipelineOptions options) {
@@ -151,6 +157,16 @@ public class DirectRunner extends 
PipelineRunner<DirectPipelineResult> {
 
   @Override
   public DirectPipelineResult run(Pipeline pipeline) {
+    try {
+      options =
+          MAPPER
+              .readValue(MAPPER.writeValueAsBytes(options), 
PipelineOptions.class)
+              .as(DirectOptions.class);
+    } catch (IOException e) {
+      throw new IllegalArgumentException(
+          "PipelineOptions specified failed to serialize to JSON.", e);
+    }
+
     pipeline.replaceAll(defaultTransformOverrides());
     MetricsEnvironment.setMetricsSupported(true);
     try {
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
index d3ede31..2d951ef 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
@@ -22,9 +22,11 @@ import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThan;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.isA;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
 
+import com.fasterxml.jackson.annotation.JsonIgnore;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -55,6 +57,7 @@ import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.CountingSource;
 import org.apache.beam.sdk.io.GenerateSequence;
 import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.PAssert;
@@ -581,6 +584,54 @@ public class DirectRunnerTest implements Serializable {
     p.run();
   }
 
+  /**
+   * Tests that {@link DirectRunner#fromOptions(PipelineOptions)} drops {@link 
PipelineOptions}
+   * marked with {@link JsonIgnore} fields.
+   */
+  @Test
+  public void testFromOptionsIfIgnoredFieldsGettingDropped() {
+    TestSerializationOfOptions options =
+        PipelineOptionsFactory.fromArgs(
+                "--foo=testValue", "--ignoredField=overridden", 
"--runner=DirectRunner")
+            .as(TestSerializationOfOptions.class);
+
+    assertEquals("testValue", options.getFoo());
+    assertEquals("overridden", options.getIgnoredField());
+    Pipeline p = Pipeline.create(options);
+    PCollection<Integer> pc =
+        p.apply(Create.of("1"))
+            .apply(
+                ParDo.of(
+                    new DoFn<String, Integer>() {
+                      @ProcessElement
+                      public void processElement(ProcessContext c) {
+                        TestSerializationOfOptions options =
+                            
c.getPipelineOptions().as(TestSerializationOfOptions.class);
+                        assertEquals("testValue", options.getFoo());
+                        assertEquals("not overridden", 
options.getIgnoredField());
+                        c.output(Integer.parseInt(c.element()));
+                      }
+                    }));
+    PAssert.that(pc).containsInAnyOrder(1);
+    p.run();
+  }
+
+  /**
+   * Options for testing if {@link DirectRunner} drops {@link PipelineOptions} 
marked with {@link
+   * JsonIgnore} fields.
+   */
+  public interface TestSerializationOfOptions extends PipelineOptions {
+    String getFoo();
+
+    void setFoo(String foo);
+
+    @JsonIgnore
+    @Default.String("not overridden")
+    String getIgnoredField();
+
+    void setIgnoredField(String value);
+  }
+
   private static class LongNoDecodeCoder extends AtomicCoder<Long> {
     @Override
     public void encode(Long value, OutputStream outStream) throws IOException 
{}

Reply via email to