[ 
https://issues.apache.org/jira/browse/BEAM-3883?focusedWorklogId=117109&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-117109
 ]

ASF GitHub Bot logged work on BEAM-3883:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 28/Jun/18 22:58
            Start Date: 28/Jun/18 22:58
    Worklog Time Spent: 10m 
      Work Description: robertwb closed pull request #5817: [BEAM-3883] Stage 
files in the portability runner.
URL: https://github.com/apache/beam/pull/5817
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py 
b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
index 784166cafda..ccce9a99ef7 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
@@ -442,10 +442,11 @@ def _stage_resources(self, options):
       raise RuntimeError('The --temp_location option must be specified.')
 
     resource_stager = _LegacyDataflowStager(self)
-    return resource_stager.stage_job_resources(
+    _, resources = resource_stager.stage_job_resources(
         options,
         temp_dir=tempfile.mkdtemp(),
         staging_location=google_cloud_options.staging_location)
+    return resources
 
   def stage_file(self, gcs_or_local_path, file_name, stream,
                  mime_type='application/octet-stream'):
diff --git a/sdks/python/apache_beam/runners/portability/portable_runner.py 
b/sdks/python/apache_beam/runners/portability/portable_runner.py
index 0da33fa7f4a..26376c93337 100644
--- a/sdks/python/apache_beam/runners/portability/portable_runner.py
+++ b/sdks/python/apache_beam/runners/portability/portable_runner.py
@@ -24,12 +24,11 @@
 from apache_beam import metrics
 from apache_beam.options.pipeline_options import PortableOptions
 from apache_beam.portability import common_urns
-from apache_beam.portability.api import beam_artifact_api_pb2
-from apache_beam.portability.api import beam_artifact_api_pb2_grpc
 from apache_beam.portability.api import beam_job_api_pb2
 from apache_beam.portability.api import beam_job_api_pb2_grpc
 from apache_beam.runners import pipeline_context
 from apache_beam.runners import runner
+from apache_beam.runners.portability import portable_stager
 
 __all__ = ['PortableRunner']
 
@@ -92,16 +91,12 @@ def run_pipeline(self, pipeline):
         beam_job_api_pb2.PrepareJobRequest(
             job_name='job', pipeline=proto_pipeline))
     if prepare_response.artifact_staging_endpoint.url:
-      # Must commit something to get a retrieval token,
-      # committing empty manifest for now.
-      # TODO(BEAM-3883): Actually stage required files.
-      artifact_service = beam_artifact_api_pb2_grpc.ArtifactStagingServiceStub(
-          
grpc.insecure_channel(prepare_response.artifact_staging_endpoint.url))
-      commit_manifest = artifact_service.CommitManifest(
-          beam_artifact_api_pb2.CommitManifestRequest(
-              manifest=beam_artifact_api_pb2.Manifest(),
-              staging_session_token=prepare_response.staging_session_token))
-      retrieval_token = commit_manifest.retrieval_token
+      stager = portable_stager.PortableStager(
+          
grpc.insecure_channel(prepare_response.artifact_staging_endpoint.url),
+          prepare_response.staging_session_token)
+      retrieval_token, _ = stager.stage_job_resources(
+          pipeline._options,
+          staging_location='')
     else:
       retrieval_token = None
     run_response = job_service.Run(
diff --git a/sdks/python/apache_beam/runners/portability/portable_stager.py 
b/sdks/python/apache_beam/runners/portability/portable_stager.py
index f556811425b..3761373fb42 100644
--- a/sdks/python/apache_beam/runners/portability/portable_stager.py
+++ b/sdks/python/apache_beam/runners/portability/portable_stager.py
@@ -20,6 +20,8 @@
 from __future__ import division
 from __future__ import print_function
 
+import base64
+import hashlib
 import os
 
 from apache_beam.portability.api import beam_artifact_api_pb2
@@ -69,7 +71,8 @@ def stage_artifact(self, local_path_to_artifact, 
artifact_name):
 
     def artifact_request_generator():
       artifact_metadata = beam_artifact_api_pb2.ArtifactMetadata(
-          name=artifact_name)
+          name=artifact_name,
+          md5=_get_file_hash(local_path_to_artifact))
       metadata = beam_artifact_api_pb2.PutArtifactMetadata(
           staging_session_token=self._staging_session_token,
           metadata=artifact_metadata)
@@ -90,7 +93,18 @@ def artifact_request_generator():
   def commit_manifest(self):
     manifest = beam_artifact_api_pb2.Manifest(artifact=self._artifacts)
     self._artifacts = []
-    self._artifact_staging_stub.CommitManifest(
+    return self._artifact_staging_stub.CommitManifest(
         beam_artifact_api_pb2.CommitManifestRequest(
             manifest=manifest,
-            staging_session_token=self._staging_session_token))
+            staging_session_token=self._staging_session_token)).retrieval_token
+
+
+def _get_file_hash(path):
+  hasher = hashlib.md5()
+  with open(path) as f:
+    while True:
+      chunk = f.read(1 << 21)
+      if chunk:
+        hasher.update(chunk)
+      else:
+        return base64.b64encode(hasher.digest())
diff --git a/sdks/python/apache_beam/runners/portability/stager.py 
b/sdks/python/apache_beam/runners/portability/stager.py
index a6bfa497bd5..e938970404a 100644
--- a/sdks/python/apache_beam/runners/portability/stager.py
+++ b/sdks/python/apache_beam/runners/portability/stager.py
@@ -219,6 +219,10 @@ def stage_job_resources(self,
         resources.extend(
             self._stage_beam_sdk(sdk_remote_location, staging_location,
                                  temp_dir))
+      elif setup_options.sdk_location == 'container':
+        # Use the SDK that's built into the container, rather than re-staging
+        # it.
+        pass
       else:
         # This branch is also used by internal tests running with the SDK built
         # at head.
@@ -252,8 +256,8 @@ def stage_job_resources(self,
 
     # Delete all temp files created while staging job resources.
     shutil.rmtree(temp_dir)
-    self.commit_manifest()
-    return resources
+    retrieval_token = self.commit_manifest()
+    return retrieval_token, resources
 
   @staticmethod
   def _download_file(from_url, to_path):
diff --git a/sdks/python/apache_beam/runners/portability/stager_test.py 
b/sdks/python/apache_beam/runners/portability/stager_test.py
index 56b57d16796..f6ee55b5249 100644
--- a/sdks/python/apache_beam/runners/portability/stager_test.py
+++ b/sdks/python/apache_beam/runners/portability/stager_test.py
@@ -133,7 +133,7 @@ def test_no_main_session(self):
 
     self.assertEqual([],
                      self.stager.stage_job_resources(
-                         options, staging_location=staging_dir))
+                         options, staging_location=staging_dir)[1])
 
   def test_with_main_session(self):
     staging_dir = self.make_temp_dir()
@@ -144,7 +144,7 @@ def test_with_main_session(self):
 
     self.assertEqual([names.PICKLED_MAIN_SESSION_FILE],
                      self.stager.stage_job_resources(
-                         options, staging_location=staging_dir))
+                         options, staging_location=staging_dir)[1])
     self.assertTrue(
         os.path.isfile(
             os.path.join(staging_dir, names.PICKLED_MAIN_SESSION_FILE)))
@@ -156,7 +156,7 @@ def test_default_resources(self):
 
     self.assertEqual([],
                      self.stager.stage_job_resources(
-                         options, staging_location=staging_dir))
+                         options, staging_location=staging_dir)[1])
 
   def test_with_requirements_file(self):
     staging_dir = self.make_temp_dir()
@@ -176,7 +176,7 @@ def test_with_requirements_file(self):
             self.stager.stage_job_resources(
                 options,
                 populate_requirements_cache=self.populate_requirements_cache,
-                staging_location=staging_dir)))
+                staging_location=staging_dir)[1]))
     self.assertTrue(
         os.path.isfile(os.path.join(staging_dir, stager.REQUIREMENTS_FILE)))
     self.assertTrue(os.path.isfile(os.path.join(staging_dir, 'abc.txt')))
@@ -214,7 +214,7 @@ def test_with_requirements_file_and_cache(self):
             self.stager.stage_job_resources(
                 options,
                 populate_requirements_cache=self.populate_requirements_cache,
-                staging_location=staging_dir)))
+                staging_location=staging_dir)[1]))
     self.assertTrue(
         os.path.isfile(os.path.join(staging_dir, stager.REQUIREMENTS_FILE)))
     self.assertTrue(os.path.isfile(os.path.join(staging_dir, 'abc.txt')))
@@ -244,7 +244,7 @@ def test_with_setup_file(self):
                 os.path.join(source_dir, stager.WORKFLOW_TARBALL_FILE)
             ],
             temp_dir=source_dir,
-            staging_location=staging_dir))
+            staging_location=staging_dir)[1])
     self.assertTrue(
         os.path.isfile(os.path.join(staging_dir, 
stager.WORKFLOW_TARBALL_FILE)))
 
@@ -287,7 +287,7 @@ def test_sdk_location_default(self):
     with mock.patch(
         'apache_beam.utils.processes.check_call',
         self.build_fake_pip_download_command_handler(has_wheels=False)):
-      staged_resources = self.stager.stage_job_resources(
+      _, staged_resources = self.stager.stage_job_resources(
           options, temp_dir=self.make_temp_dir(), staging_location=staging_dir)
 
     self.assertEqual([names.DATAFLOW_SDK_TARBALL_FILE], staged_resources)
@@ -305,7 +305,7 @@ def test_sdk_location_default_with_wheels(self):
     with mock.patch(
         'apache_beam.utils.processes.check_call',
         self.build_fake_pip_download_command_handler(has_wheels=True)):
-      staged_resources = self.stager.stage_job_resources(
+      _, staged_resources = self.stager.stage_job_resources(
           options, temp_dir=self.make_temp_dir(), staging_location=staging_dir)
 
       self.assertEqual(len(staged_resources), 2)
@@ -329,7 +329,7 @@ def test_sdk_location_local_directory(self):
 
     self.assertEqual([names.DATAFLOW_SDK_TARBALL_FILE],
                      self.stager.stage_job_resources(
-                         options, staging_location=staging_dir))
+                         options, staging_location=staging_dir)[1])
     tarball_path = os.path.join(staging_dir, names.DATAFLOW_SDK_TARBALL_FILE)
     with open(tarball_path) as f:
       self.assertEqual(f.read(), 'Package content.')
@@ -347,7 +347,7 @@ def test_sdk_location_local_source_file(self):
 
     self.assertEqual([names.DATAFLOW_SDK_TARBALL_FILE],
                      self.stager.stage_job_resources(
-                         options, staging_location=staging_dir))
+                         options, staging_location=staging_dir)[1])
     tarball_path = os.path.join(staging_dir, names.DATAFLOW_SDK_TARBALL_FILE)
     with open(tarball_path) as f:
       self.assertEqual(f.read(), 'Package content.')
@@ -365,7 +365,7 @@ def test_sdk_location_local_wheel_file(self):
 
     self.assertEqual([sdk_filename],
                      self.stager.stage_job_resources(
-                         options, staging_location=staging_dir))
+                         options, staging_location=staging_dir)[1])
     tarball_path = os.path.join(staging_dir, sdk_filename)
     with open(tarball_path) as f:
       self.assertEqual(f.read(), 'Package content.')
@@ -399,7 +399,7 @@ def test_sdk_location_remote_source_file(self, 
*unused_mocks):
 
     self.assertEqual([names.DATAFLOW_SDK_TARBALL_FILE],
                      self.stager.stage_job_resources(
-                         options, staging_location=staging_dir))
+                         options, staging_location=staging_dir)[1])
 
   @mock.patch(
       'apache_beam.runners.portability.stager_test.TestStager.stage_artifact')
@@ -425,7 +425,7 @@ def is_remote_path(path):
         '.stager.Stager._is_remote_path', staticmethod(is_remote_path)):
       self.assertEqual([sdk_filename],
                        self.stager.stage_job_resources(
-                           options, staging_location=staging_dir))
+                           options, staging_location=staging_dir)[1])
 
   def test_sdk_location_http(self):
     staging_dir = self.make_temp_dir()
@@ -445,7 +445,7 @@ def file_download(_, to_path):
         '.stager.Stager._download_file', staticmethod(file_download)):
       self.assertEqual([names.DATAFLOW_SDK_TARBALL_FILE],
                        self.stager.stage_job_resources(
-                           options, staging_location=staging_dir))
+                           options, staging_location=staging_dir)[1])
 
     tarball_path = os.path.join(staging_dir, names.DATAFLOW_SDK_TARBALL_FILE)
     with open(tarball_path) as f:
@@ -500,7 +500,7 @@ def file_copy(from_path, to_path):
             'abc.tar.gz', 'xyz.tar.gz', 'xyz2.tar', 'whl.whl',
             'remote_file.tar.gz', stager.EXTRA_PACKAGES_FILE
         ], self.stager.stage_job_resources(
-            options, staging_location=staging_dir))
+            options, staging_location=staging_dir)[1])
     with open(os.path.join(staging_dir, stager.EXTRA_PACKAGES_FILE)) as f:
       self.assertEqual([
           'abc.tar.gz\n', 'xyz.tar.gz\n', 'xyz2.tar\n', 'whl.whl\n',


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 117109)
    Time Spent: 19h 40m  (was: 19.5h)

> Python SDK stages artifacts when talking to job server
> ------------------------------------------------------
>
>                 Key: BEAM-3883
>                 URL: https://issues.apache.org/jira/browse/BEAM-3883
>             Project: Beam
>          Issue Type: Sub-task
>          Components: sdk-py-core
>            Reporter: Ben Sidhom
>            Assignee: Ankur Goenka
>            Priority: Major
>             Fix For: 2.6.0
>
>          Time Spent: 19h 40m
>  Remaining Estimate: 0h
>
> The Python SDK does not currently stage its user-defined functions or 
> dependencies when talking to the job API. Artifacts that need to be staged 
> include the user code itself, any SDK components not included in the 
> container image, and the list of Python packages that must be installed at 
> runtime.
>  
> Artifacts that are currently expected can be found in the harness boot code: 
> [https://github.com/apache/beam/blob/58e3b06bee7378d2d8db1c8dd534b415864f63e1/sdks/python/container/boot.go#L52.]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to