[ https://issues.apache.org/jira/browse/BEAM-4733?focusedWorklogId=120982&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-120982 ]
ASF GitHub Bot logged work on BEAM-4733: ---------------------------------------- Author: ASF GitHub Bot Created on: 09/Jul/18 19:42 Start Date: 09/Jul/18 19:42 Worklog Time Spent: 10m Work Description: tweise closed pull request #5888: [BEAM-4733] Pass pipeline options from Python portable runner to job server. URL: https://github.com/apache/beam/pull/5888 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineOptionsTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineOptionsTranslation.java index 4cdca616308..efe61e7d389 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineOptionsTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineOptionsTranslation.java @@ -18,14 +18,24 @@ package org.apache.beam.runners.core.construction; +import com.fasterxml.jackson.core.TreeNode; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.CaseFormat; +import com.google.common.collect.ImmutableMap; import com.google.protobuf.Struct; import com.google.protobuf.util.JsonFormat; import java.io.IOException; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.util.common.ReflectHelpers; -/** Utilities for going to/from Runner API pipeline options. */ +/** + * Utilities for going to/from Runner API pipeline options. + * + * <p>TODO: Make this the default to/from translation for PipelineOptions. + */ public class PipelineOptionsTranslation { private static final ObjectMapper MAPPER = new ObjectMapper() @@ -34,10 +44,26 @@ /** Converts the provided {@link PipelineOptions} to a {@link Struct}. */ public static Struct toProto(PipelineOptions options) { Struct.Builder builder = Struct.newBuilder(); + try { + // TODO: Officially define URNs for options and their scheme. + TreeNode treeNode = MAPPER.valueToTree(options); + TreeNode rootOptions = treeNode.get("options"); + Iterator<String> optionsKeys = rootOptions.fieldNames(); + Map<String, TreeNode> optionsUsingUrns = new HashMap<>(); + while (optionsKeys.hasNext()) { + String optionKey = optionsKeys.next(); + TreeNode optionValue = rootOptions.get(optionKey); + optionsUsingUrns.put( + "beam:option:" + + CaseFormat.LOWER_CAMEL.to(CaseFormat.LOWER_UNDERSCORE, optionKey) + + ":v1", + optionValue); + } + // The JSON format of a Protobuf Struct is the JSON object that is equivalent to that struct // (with values encoded in a standard json-codeable manner). See Beam PR 3719 for more. - JsonFormat.parser().merge(MAPPER.writeValueAsString(options), builder); + JsonFormat.parser().merge(MAPPER.writeValueAsString(optionsUsingUrns), builder); return builder.build(); } catch (IOException e) { throw new RuntimeException(e); @@ -46,6 +72,20 @@ public static Struct toProto(PipelineOptions options) { /** Converts the provided {@link Struct} into {@link PipelineOptions}. */ public static PipelineOptions fromProto(Struct protoOptions) throws IOException { - return MAPPER.readValue(JsonFormat.printer().print(protoOptions), PipelineOptions.class); + Map<String, TreeNode> mapWithoutUrns = new HashMap<>(); + TreeNode rootOptions = MAPPER.readTree(JsonFormat.printer().print(protoOptions)); + Iterator<String> optionsKeys = rootOptions.fieldNames(); + while (optionsKeys.hasNext()) { + String optionKey = optionsKeys.next(); + TreeNode optionValue = rootOptions.get(optionKey); + mapWithoutUrns.put( + CaseFormat.LOWER_UNDERSCORE.to( + CaseFormat.LOWER_CAMEL, + optionKey.substring("beam:option:".length(), optionKey.length() - ":v1".length())), + optionValue); + } + return MAPPER.readValue( + MAPPER.writeValueAsString(ImmutableMap.of("options", mapWithoutUrns)), + PipelineOptions.class); } } diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PipelineOptionsTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PipelineOptionsTranslationTest.java index adf4759733c..0faa1db2984 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PipelineOptionsTranslationTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PipelineOptionsTranslationTest.java @@ -130,7 +130,9 @@ public void emptyStructDeserializes() throws Exception { public void structWithNullOptionsDeserializes() throws Exception { Struct serialized = Struct.newBuilder() - .putFields("options", Value.newBuilder().setNullValue(NullValue.NULL_VALUE).build()) + .putFields( + "beam:option:option_key:v1", + Value.newBuilder().setNullValue(NullValue.NULL_VALUE).build()) .build(); PipelineOptions deserialized = PipelineOptionsTranslation.fromProto(serialized); diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java index dd6c6a3ff58..348d99fface 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java @@ -450,6 +450,7 @@ private void translateImpulse( FlinkPipelineTranslatorUtils.createOutputMap(outputs.keySet()); Map<String, Coder<WindowedValue<?>>> outputCoders = Maps.newHashMap(); Map<TupleTag<?>, Integer> tagsToIds = Maps.newHashMap(); + Map<String, TupleTag<?>> collectionIdToTupleTag = Maps.newHashMap(); // order output names for deterministic mapping for (String localOutputName : new TreeMap<>(outputIndexMap).keySet()) { String collectionId = outputs.get(localOutputName); @@ -461,6 +462,7 @@ private void translateImpulse( tagsToOutputTags.put(tupleTag, new OutputTag<>(localOutputName, typeInformation)); tagsToCoders.put(tupleTag, windowCoder); tagsToIds.put(tupleTag, outputIndexMap.get(localOutputName)); + collectionIdToTupleTag.put(collectionId, tupleTag); } final SingleOutputStreamOperator<WindowedValue<OutputT>> outputStream; @@ -487,7 +489,7 @@ private void translateImpulse( // TODO: side inputs DoFnOperator<InputT, OutputT> doFnOperator = - new ExecutableStageDoFnOperator<InputT, OutputT>( + new ExecutableStageDoFnOperator<>( transform.getUniqueName(), inputCoder, mainOutputTag, @@ -498,7 +500,8 @@ private void translateImpulse( context.getPipelineOptions(), stagePayload, context.getJobInfo(), - FlinkExecutableStageContext.batchFactory()); + FlinkExecutableStageContext.batchFactory(), + collectionIdToTupleTag); outputStream = inputDataStream.transform(transform.getUniqueName(), outputTypeInformation, doFnOperator); diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java index fbd10edae1e..cf8e7345e1c 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java @@ -20,7 +20,6 @@ import static org.apache.flink.util.Preconditions.checkState; import java.util.Collection; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.logging.Logger; @@ -84,7 +83,8 @@ public ExecutableStageDoFnOperator( PipelineOptions options, RunnerApi.ExecutableStagePayload payload, JobInfo jobInfo, - FlinkExecutableStageContext.Factory contextFactory) { + FlinkExecutableStageContext.Factory contextFactory, + Map<String, TupleTag<?>> outputMap) { super( new NoOpDoFn(), stepName, @@ -101,19 +101,7 @@ public ExecutableStageDoFnOperator( this.payload = payload; this.jobInfo = jobInfo; this.contextFactory = contextFactory; - this.outputMap = createOutputMap(mainOutputTag, additionalOutputTags); - } - - private static Map<String, TupleTag<?>> createOutputMap( - TupleTag mainOutput, List<TupleTag<?>> additionalOutputs) { - Map<String, TupleTag<?>> outputMap = new HashMap<>(additionalOutputs.size() + 1); - if (mainOutput != null) { - outputMap.put(mainOutput.getId(), mainOutput); - } - for (TupleTag<?> additionalTag : additionalOutputs) { - outputMap.put(additionalTag.getId(), additionalTag); - } - return outputMap; + this.outputMap = outputMap; } @Override diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/ExecutableStageDoFnOperatorTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/ExecutableStageDoFnOperatorTest.java index 9136eb5cb90..33cafada639 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/ExecutableStageDoFnOperatorTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/ExecutableStageDoFnOperatorTest.java @@ -32,7 +32,9 @@ import com.google.common.collect.ImmutableMap; import com.google.protobuf.Struct; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import org.apache.beam.model.pipeline.v1.RunnerApi.Components; import org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload; import org.apache.beam.model.pipeline.v1.RunnerApi.PCollection; @@ -325,7 +327,8 @@ public void testSerialization() { PipelineOptionsFactory.as(FlinkPipelineOptions.class), stagePayload, jobInfo, - FlinkExecutableStageContext.batchFactory()); + FlinkExecutableStageContext.batchFactory(), + createOutputMap(mainOutput, ImmutableList.of(additionalOutput))); ExecutableStageDoFnOperator<Integer, Integer> clone = SerializationUtils.clone(operator); assertNotNull(clone); @@ -358,8 +361,21 @@ public void testSerialization() { PipelineOptionsFactory.as(FlinkPipelineOptions.class), stagePayload, jobInfo, - contextFactory); + contextFactory, + createOutputMap(mainOutput, additionalOutputs)); return operator; } + + private static Map<String, TupleTag<?>> createOutputMap( + TupleTag mainOutput, List<TupleTag<?>> additionalOutputs) { + Map<String, TupleTag<?>> outputMap = new HashMap<>(additionalOutputs.size() + 1); + if (mainOutput != null) { + outputMap.put(mainOutput.getId(), mainOutput); + } + for (TupleTag<?> additionalTag : additionalOutputs) { + outputMap.put(additionalTag.getId(), additionalTag); + } + return outputMap; + } } diff --git a/sdks/python/apache_beam/runners/portability/portable_runner.py b/sdks/python/apache_beam/runners/portability/portable_runner.py index 9ad35378f39..d2e6b91c9f8 100644 --- a/sdks/python/apache_beam/runners/portability/portable_runner.py +++ b/sdks/python/apache_beam/runners/portability/portable_runner.py @@ -30,6 +30,7 @@ from apache_beam.portability.api import beam_job_api_pb2_grpc from apache_beam.runners import pipeline_context from apache_beam.runners import runner +from apache_beam.runners.job import utils as job_utils from apache_beam.runners.portability import portable_stager __all__ = ['PortableRunner'] @@ -100,11 +101,17 @@ def run_pipeline(self, pipeline): del proto_pipeline.components.transforms[sub_transform] del transform_proto.subtransforms[:] + # TODO: Define URNs for options. + options = {'beam:option:' + k + ':v1': v + for k, v in pipeline._options.get_all_options().iteritems() + if v is not None} + job_service = beam_job_api_pb2_grpc.JobServiceStub( grpc.insecure_channel(job_endpoint)) prepare_response = job_service.Prepare( beam_job_api_pb2.PrepareJobRequest( - job_name='job', pipeline=proto_pipeline)) + job_name='job', pipeline=proto_pipeline, + pipeline_options=job_utils.dict_to_struct(options))) if prepare_response.artifact_staging_endpoint.url: stager = portable_stager.PortableStager( grpc.insecure_channel(prepare_response.artifact_staging_endpoint.url), ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 120982) Time Spent: 1h 20m (was: 1h 10m) > Python portable runner to pass pipeline options to job service > -------------------------------------------------------------- > > Key: BEAM-4733 > URL: https://issues.apache.org/jira/browse/BEAM-4733 > Project: Beam > Issue Type: Bug > Components: runner-flink, sdk-py-harness > Reporter: Thomas Weise > Assignee: Thomas Weise > Priority: Major > Labels: portability > Time Spent: 1h 20m > Remaining Estimate: 0h > > The portable runner client currently drops the pipeline options. In the > prototype, it was implemented here: > [https://github.com/bsidhom/beam/commit/ce8a79122b98a7cbcf6fea7db4a5fe31b6e8248a#diff-f2f54a3d6ae6ef6a22e5d52621a133ed] > -- This message was sent by Atlassian JIRA (v7.6.3#76005)