[ 
https://issues.apache.org/jira/browse/BEAM-4733?focusedWorklogId=120982&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-120982
 ]

ASF GitHub Bot logged work on BEAM-4733:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 09/Jul/18 19:42
            Start Date: 09/Jul/18 19:42
    Worklog Time Spent: 10m 
      Work Description: tweise closed pull request #5888: [BEAM-4733] Pass 
pipeline options from Python portable runner to job server.
URL: https://github.com/apache/beam/pull/5888
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineOptionsTranslation.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineOptionsTranslation.java
index 4cdca616308..efe61e7d389 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineOptionsTranslation.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineOptionsTranslation.java
@@ -18,14 +18,24 @@
 
 package org.apache.beam.runners.core.construction;
 
+import com.fasterxml.jackson.core.TreeNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.CaseFormat;
+import com.google.common.collect.ImmutableMap;
 import com.google.protobuf.Struct;
 import com.google.protobuf.util.JsonFormat;
 import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.util.common.ReflectHelpers;
 
-/** Utilities for going to/from Runner API pipeline options. */
+/**
+ * Utilities for going to/from Runner API pipeline options.
+ *
+ * <p>TODO: Make this the default to/from translation for PipelineOptions.
+ */
 public class PipelineOptionsTranslation {
   private static final ObjectMapper MAPPER =
       new ObjectMapper()
@@ -34,10 +44,26 @@
   /** Converts the provided {@link PipelineOptions} to a {@link Struct}. */
   public static Struct toProto(PipelineOptions options) {
     Struct.Builder builder = Struct.newBuilder();
+
     try {
+      // TODO: Officially define URNs for options and their scheme.
+      TreeNode treeNode = MAPPER.valueToTree(options);
+      TreeNode rootOptions = treeNode.get("options");
+      Iterator<String> optionsKeys = rootOptions.fieldNames();
+      Map<String, TreeNode> optionsUsingUrns = new HashMap<>();
+      while (optionsKeys.hasNext()) {
+        String optionKey = optionsKeys.next();
+        TreeNode optionValue = rootOptions.get(optionKey);
+        optionsUsingUrns.put(
+            "beam:option:"
+                + CaseFormat.LOWER_CAMEL.to(CaseFormat.LOWER_UNDERSCORE, 
optionKey)
+                + ":v1",
+            optionValue);
+      }
+
       // The JSON format of a Protobuf Struct is the JSON object that is 
equivalent to that struct
       // (with values encoded in a standard json-codeable manner). See Beam PR 
3719 for more.
-      JsonFormat.parser().merge(MAPPER.writeValueAsString(options), builder);
+      JsonFormat.parser().merge(MAPPER.writeValueAsString(optionsUsingUrns), 
builder);
       return builder.build();
     } catch (IOException e) {
       throw new RuntimeException(e);
@@ -46,6 +72,20 @@ public static Struct toProto(PipelineOptions options) {
 
   /** Converts the provided {@link Struct} into {@link PipelineOptions}. */
   public static PipelineOptions fromProto(Struct protoOptions) throws 
IOException {
-    return MAPPER.readValue(JsonFormat.printer().print(protoOptions), 
PipelineOptions.class);
+    Map<String, TreeNode> mapWithoutUrns = new HashMap<>();
+    TreeNode rootOptions = 
MAPPER.readTree(JsonFormat.printer().print(protoOptions));
+    Iterator<String> optionsKeys = rootOptions.fieldNames();
+    while (optionsKeys.hasNext()) {
+      String optionKey = optionsKeys.next();
+      TreeNode optionValue = rootOptions.get(optionKey);
+      mapWithoutUrns.put(
+          CaseFormat.LOWER_UNDERSCORE.to(
+              CaseFormat.LOWER_CAMEL,
+              optionKey.substring("beam:option:".length(), optionKey.length() 
- ":v1".length())),
+          optionValue);
+    }
+    return MAPPER.readValue(
+        MAPPER.writeValueAsString(ImmutableMap.of("options", mapWithoutUrns)),
+        PipelineOptions.class);
   }
 }
diff --git 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PipelineOptionsTranslationTest.java
 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PipelineOptionsTranslationTest.java
index adf4759733c..0faa1db2984 100644
--- 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PipelineOptionsTranslationTest.java
+++ 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PipelineOptionsTranslationTest.java
@@ -130,7 +130,9 @@ public void emptyStructDeserializes() throws Exception {
     public void structWithNullOptionsDeserializes() throws Exception {
       Struct serialized =
           Struct.newBuilder()
-              .putFields("options", 
Value.newBuilder().setNullValue(NullValue.NULL_VALUE).build())
+              .putFields(
+                  "beam:option:option_key:v1",
+                  
Value.newBuilder().setNullValue(NullValue.NULL_VALUE).build())
               .build();
       PipelineOptions deserialized = 
PipelineOptionsTranslation.fromProto(serialized);
 
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
index dd6c6a3ff58..348d99fface 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
@@ -450,6 +450,7 @@ private void translateImpulse(
         FlinkPipelineTranslatorUtils.createOutputMap(outputs.keySet());
     Map<String, Coder<WindowedValue<?>>> outputCoders = Maps.newHashMap();
     Map<TupleTag<?>, Integer> tagsToIds = Maps.newHashMap();
+    Map<String, TupleTag<?>> collectionIdToTupleTag = Maps.newHashMap();
     // order output names for deterministic mapping
     for (String localOutputName : new TreeMap<>(outputIndexMap).keySet()) {
       String collectionId = outputs.get(localOutputName);
@@ -461,6 +462,7 @@ private void translateImpulse(
       tagsToOutputTags.put(tupleTag, new OutputTag<>(localOutputName, 
typeInformation));
       tagsToCoders.put(tupleTag, windowCoder);
       tagsToIds.put(tupleTag, outputIndexMap.get(localOutputName));
+      collectionIdToTupleTag.put(collectionId, tupleTag);
     }
 
     final SingleOutputStreamOperator<WindowedValue<OutputT>> outputStream;
@@ -487,7 +489,7 @@ private void translateImpulse(
 
     // TODO: side inputs
     DoFnOperator<InputT, OutputT> doFnOperator =
-        new ExecutableStageDoFnOperator<InputT, OutputT>(
+        new ExecutableStageDoFnOperator<>(
             transform.getUniqueName(),
             inputCoder,
             mainOutputTag,
@@ -498,7 +500,8 @@ private void translateImpulse(
             context.getPipelineOptions(),
             stagePayload,
             context.getJobInfo(),
-            FlinkExecutableStageContext.batchFactory());
+            FlinkExecutableStageContext.batchFactory(),
+            collectionIdToTupleTag);
 
     outputStream =
         inputDataStream.transform(transform.getUniqueName(), 
outputTypeInformation, doFnOperator);
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
index fbd10edae1e..cf8e7345e1c 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
@@ -20,7 +20,6 @@
 import static org.apache.flink.util.Preconditions.checkState;
 
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.logging.Logger;
@@ -84,7 +83,8 @@ public ExecutableStageDoFnOperator(
       PipelineOptions options,
       RunnerApi.ExecutableStagePayload payload,
       JobInfo jobInfo,
-      FlinkExecutableStageContext.Factory contextFactory) {
+      FlinkExecutableStageContext.Factory contextFactory,
+      Map<String, TupleTag<?>> outputMap) {
     super(
         new NoOpDoFn(),
         stepName,
@@ -101,19 +101,7 @@ public ExecutableStageDoFnOperator(
     this.payload = payload;
     this.jobInfo = jobInfo;
     this.contextFactory = contextFactory;
-    this.outputMap = createOutputMap(mainOutputTag, additionalOutputTags);
-  }
-
-  private static Map<String, TupleTag<?>> createOutputMap(
-      TupleTag mainOutput, List<TupleTag<?>> additionalOutputs) {
-    Map<String, TupleTag<?>> outputMap = new 
HashMap<>(additionalOutputs.size() + 1);
-    if (mainOutput != null) {
-      outputMap.put(mainOutput.getId(), mainOutput);
-    }
-    for (TupleTag<?> additionalTag : additionalOutputs) {
-      outputMap.put(additionalTag.getId(), additionalTag);
-    }
-    return outputMap;
+    this.outputMap = outputMap;
   }
 
   @Override
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/ExecutableStageDoFnOperatorTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/ExecutableStageDoFnOperatorTest.java
index 9136eb5cb90..33cafada639 100644
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/ExecutableStageDoFnOperatorTest.java
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/ExecutableStageDoFnOperatorTest.java
@@ -32,7 +32,9 @@
 import com.google.common.collect.ImmutableMap;
 import com.google.protobuf.Struct;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import org.apache.beam.model.pipeline.v1.RunnerApi.Components;
 import org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload;
 import org.apache.beam.model.pipeline.v1.RunnerApi.PCollection;
@@ -325,7 +327,8 @@ public void testSerialization() {
             PipelineOptionsFactory.as(FlinkPipelineOptions.class),
             stagePayload,
             jobInfo,
-            FlinkExecutableStageContext.batchFactory());
+            FlinkExecutableStageContext.batchFactory(),
+            createOutputMap(mainOutput, ImmutableList.of(additionalOutput)));
 
     ExecutableStageDoFnOperator<Integer, Integer> clone = 
SerializationUtils.clone(operator);
     assertNotNull(clone);
@@ -358,8 +361,21 @@ public void testSerialization() {
             PipelineOptionsFactory.as(FlinkPipelineOptions.class),
             stagePayload,
             jobInfo,
-            contextFactory);
+            contextFactory,
+            createOutputMap(mainOutput, additionalOutputs));
 
     return operator;
   }
+
+  private static Map<String, TupleTag<?>> createOutputMap(
+      TupleTag mainOutput, List<TupleTag<?>> additionalOutputs) {
+    Map<String, TupleTag<?>> outputMap = new 
HashMap<>(additionalOutputs.size() + 1);
+    if (mainOutput != null) {
+      outputMap.put(mainOutput.getId(), mainOutput);
+    }
+    for (TupleTag<?> additionalTag : additionalOutputs) {
+      outputMap.put(additionalTag.getId(), additionalTag);
+    }
+    return outputMap;
+  }
 }
diff --git a/sdks/python/apache_beam/runners/portability/portable_runner.py 
b/sdks/python/apache_beam/runners/portability/portable_runner.py
index 9ad35378f39..d2e6b91c9f8 100644
--- a/sdks/python/apache_beam/runners/portability/portable_runner.py
+++ b/sdks/python/apache_beam/runners/portability/portable_runner.py
@@ -30,6 +30,7 @@
 from apache_beam.portability.api import beam_job_api_pb2_grpc
 from apache_beam.runners import pipeline_context
 from apache_beam.runners import runner
+from apache_beam.runners.job import utils as job_utils
 from apache_beam.runners.portability import portable_stager
 
 __all__ = ['PortableRunner']
@@ -100,11 +101,17 @@ def run_pipeline(self, pipeline):
           del proto_pipeline.components.transforms[sub_transform]
         del transform_proto.subtransforms[:]
 
+    # TODO: Define URNs for options.
+    options = {'beam:option:' + k + ':v1': v
+               for k, v in pipeline._options.get_all_options().iteritems()
+               if v is not None}
+
     job_service = beam_job_api_pb2_grpc.JobServiceStub(
         grpc.insecure_channel(job_endpoint))
     prepare_response = job_service.Prepare(
         beam_job_api_pb2.PrepareJobRequest(
-            job_name='job', pipeline=proto_pipeline))
+            job_name='job', pipeline=proto_pipeline,
+            pipeline_options=job_utils.dict_to_struct(options)))
     if prepare_response.artifact_staging_endpoint.url:
       stager = portable_stager.PortableStager(
           
grpc.insecure_channel(prepare_response.artifact_staging_endpoint.url),


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 120982)
    Time Spent: 1h 20m  (was: 1h 10m)

> Python portable runner to pass pipeline options to job service
> --------------------------------------------------------------
>
>                 Key: BEAM-4733
>                 URL: https://issues.apache.org/jira/browse/BEAM-4733
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-flink, sdk-py-harness
>            Reporter: Thomas Weise
>            Assignee: Thomas Weise
>            Priority: Major
>              Labels: portability
>          Time Spent: 1h 20m
>  Remaining Estimate: 0h
>
> The portable runner client currently drops the pipeline options. In the 
> prototype, it was implemented here: 
> [https://github.com/bsidhom/beam/commit/ce8a79122b98a7cbcf6fea7db4a5fe31b6e8248a#diff-f2f54a3d6ae6ef6a22e5d52621a133ed]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to