[ https://issues.apache.org/jira/browse/BEAM-5637?focusedWorklogId=154020&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-154020 ]
ASF GitHub Bot logged work on BEAM-5637: ---------------------------------------- Author: ASF GitHub Bot Created on: 12/Oct/18 20:42 Start Date: 12/Oct/18 20:42 Worklog Time Spent: 10m Work Description: HuangLED closed pull request #6667: [BEAM-5637] Python support for custom dataflow worker jar URL: https://github.com/apache/beam/pull/6667 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index a172535b100..2c061e0ec52 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -674,7 +674,12 @@ def _add_argparse_args(cls, parser): 'job submission, the files will be staged in the staging area ' '(--staging_location option) and the workers will install them in ' 'same order they were specified on the command line.')) - + parser.add_argument( + '--dataflow_worker_jar', + dest='dataflow_worker_jar', + type=str, + help='Dataflow worker jar.' + ) class PortableOptions(PipelineOptions): diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py index 1acd3488524..5be60bd701b 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py @@ -381,6 +381,12 @@ def run_pipeline(self, pipeline): self.dataflow_client = apiclient.DataflowApplicationClient( pipeline._options) + if setup_options.dataflow_worker_jar: + experiments = ["use_staged_dataflow_worker_jar"] + if debug_options.experiments is not None: + experiments = list(set(experiments + debug_options.experiments)) + debug_options.experiments = experiments + # Create the job description and send a request to the service. The result # can be None if there is no need to send a request to the service (e.g. # template creation). If a request was sent and failed then the call will diff --git a/sdks/python/apache_beam/runners/portability/stager.py b/sdks/python/apache_beam/runners/portability/stager.py index ef7401ac6aa..e336fd3f9b9 100644 --- a/sdks/python/apache_beam/runners/portability/stager.py +++ b/sdks/python/apache_beam/runners/portability/stager.py @@ -123,8 +123,7 @@ def stage_job_resources(self, Returns: A list of file names (no paths) for the resources staged. All the - files - are assumed to be staged at staging_location. + files are assumed to be staged at staging_location. Raises: RuntimeError: If files specified are not found or error encountered @@ -256,6 +255,13 @@ def stage_job_resources(self, 'The file "%s" cannot be found. Its location was specified by ' 'the --sdk_location command-line option.' % sdk_path) + if hasattr(setup_options, 'dataflow_worker_jar') and \ + setup_options.dataflow_worker_jar: + jar_staged_filename = 'dataflow-worker.jar' + staged_path = FileSystems.join(staging_location, jar_staged_filename) + self.stage_artifact(setup_options.dataflow_worker_jar, staged_path) + resources.append(jar_staged_filename) + # Delete all temp files created while staging job resources. shutil.rmtree(temp_dir) retrieval_token = self.commit_manifest() ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 154020) Time Spent: 1.5h (was: 1h 20m) > Python support for custom dataflow worker jar > --------------------------------------------- > > Key: BEAM-5637 > URL: https://issues.apache.org/jira/browse/BEAM-5637 > Project: Beam > Issue Type: Sub-task > Components: sdk-py-core > Reporter: Henning Rohde > Assignee: Ruoyun Huang > Priority: Major > Time Spent: 1.5h > Remaining Estimate: 0h > > One of the slightly subtle aspects is that we would need to ignore one of the > staged jars for portable Python jobs. That requires a change to the Python > boot code: > https://github.com/apache/beam/blob/66d7c865b7267f388ee60752891a9141fad43774/sdks/python/container/boot.go#L104 -- This message was sent by Atlassian JIRA (v7.6.3#76005)