[ 
https://issues.apache.org/jira/browse/BEAM-5637?focusedWorklogId=154020&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-154020
 ]

ASF GitHub Bot logged work on BEAM-5637:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 12/Oct/18 20:42
            Start Date: 12/Oct/18 20:42
    Worklog Time Spent: 10m 
      Work Description: HuangLED closed pull request #6667: [BEAM-5637] Python 
support for custom dataflow worker jar
URL: https://github.com/apache/beam/pull/6667
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/python/apache_beam/options/pipeline_options.py 
b/sdks/python/apache_beam/options/pipeline_options.py
index a172535b100..2c061e0ec52 100644
--- a/sdks/python/apache_beam/options/pipeline_options.py
+++ b/sdks/python/apache_beam/options/pipeline_options.py
@@ -674,7 +674,12 @@ def _add_argparse_args(cls, parser):
          'job submission, the files will be staged in the staging area '
          '(--staging_location option) and the workers will install them in '
          'same order they were specified on the command line.'))
-
+    parser.add_argument(
+        '--dataflow_worker_jar',
+        dest='dataflow_worker_jar',
+        type=str,
+        help='Dataflow worker jar.'
+    )
 
 class PortableOptions(PipelineOptions):
 
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py 
b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index 1acd3488524..5be60bd701b 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -381,6 +381,12 @@ def run_pipeline(self, pipeline):
     self.dataflow_client = apiclient.DataflowApplicationClient(
         pipeline._options)
 
+    if setup_options.dataflow_worker_jar:
+      experiments = ["use_staged_dataflow_worker_jar"]
+      if debug_options.experiments is not None:
+        experiments = list(set(experiments + debug_options.experiments))
+      debug_options.experiments = experiments
+
     # Create the job description and send a request to the service. The result
     # can be None if there is no need to send a request to the service (e.g.
     # template creation). If a request was sent and failed then the call will
diff --git a/sdks/python/apache_beam/runners/portability/stager.py 
b/sdks/python/apache_beam/runners/portability/stager.py
index ef7401ac6aa..e336fd3f9b9 100644
--- a/sdks/python/apache_beam/runners/portability/stager.py
+++ b/sdks/python/apache_beam/runners/portability/stager.py
@@ -123,8 +123,7 @@ def stage_job_resources(self,
 
         Returns:
           A list of file names (no paths) for the resources staged. All the
-          files
-          are assumed to be staged at staging_location.
+          files are assumed to be staged at staging_location.
 
         Raises:
           RuntimeError: If files specified are not found or error encountered
@@ -256,6 +255,13 @@ def stage_job_resources(self,
                 'The file "%s" cannot be found. Its location was specified by '
                 'the --sdk_location command-line option.' % sdk_path)
 
+    if hasattr(setup_options, 'dataflow_worker_jar') and \
+        setup_options.dataflow_worker_jar:
+      jar_staged_filename = 'dataflow-worker.jar'
+      staged_path = FileSystems.join(staging_location, jar_staged_filename)
+      self.stage_artifact(setup_options.dataflow_worker_jar, staged_path)
+      resources.append(jar_staged_filename)
+
     # Delete all temp files created while staging job resources.
     shutil.rmtree(temp_dir)
     retrieval_token = self.commit_manifest()


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 154020)
    Time Spent: 1.5h  (was: 1h 20m)

> Python support for custom dataflow worker jar
> ---------------------------------------------
>
>                 Key: BEAM-5637
>                 URL: https://issues.apache.org/jira/browse/BEAM-5637
>             Project: Beam
>          Issue Type: Sub-task
>          Components: sdk-py-core
>            Reporter: Henning Rohde
>            Assignee: Ruoyun Huang
>            Priority: Major
>          Time Spent: 1.5h
>  Remaining Estimate: 0h
>
> One of the slightly subtle aspects is that we would need to ignore one of the 
> staged jars for portable Python jobs. That requires a change to the Python 
> boot code: 
> https://github.com/apache/beam/blob/66d7c865b7267f388ee60752891a9141fad43774/sdks/python/container/boot.go#L104



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to