[ 
https://issues.apache.org/jira/browse/BEAM-3883?focusedWorklogId=101958&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-101958
 ]

ASF GitHub Bot logged work on BEAM-3883:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 15/May/18 01:11
            Start Date: 15/May/18 01:11
    Worklog Time Spent: 10m 
      Work Description: tvalentyn commented on a change in pull request #5251: 
[BEAM-3883] Refactor and clean dependency.py to make it reusable with artifact 
service
URL: https://github.com/apache/beam/pull/5251#discussion_r188141421
 
 

 ##########
 File path: sdks/python/apache_beam/runners/portability/stager.py
 ##########
 @@ -0,0 +1,573 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+"""Support for installing custom code and required dependencies.
+
+Workflows, with the exception of very simple ones, are organized in multiple
+modules and packages. Typically, these modules and packages have
+dependencies on other standard libraries. Beam relies on the Python
+setuptools package to handle these scenarios. For further details please read:
+https://pythonhosted.org/an_example_pypi_project/setuptools.html
+
+When a runner tries to run a pipeline it will check for a --requirements_file
+and a --setup_file option.
+
+If --setup_file is present then it is assumed that the folder containing the
+file specified by the option has the typical layout required by setuptools and
+it will run 'python setup.py sdist' to produce a source distribution. The
+resulting tarball (a .tar or .tar.gz file) will be staged at the staging
+location specified as job option. When a worker starts it will check for the
+presence of this file and will run 'easy_install tarball' to install the
+package in the worker.
+
+If --requirements_file is present then the file specified by the option will be
+staged in the staging location.  When a worker starts it will check for the
+presence of this file and will run 'pip install -r requirements.txt'. A
+requirements file can be easily generated by running 'pip freeze -r
+requirements.txt'. The reason a runner does not run this automatically is
+because quite often only a small fraction of the dependencies present in a
+requirements.txt file are actually needed for remote execution and therefore a
+one-time manual trimming is desirable.
+
+TODO(silviuc): Should we allow several setup packages?
+TODO(silviuc): We should allow customizing the exact command for setup build.
+"""
+import functools
+import glob
+import logging
+import os
+import shutil
+import subprocess
+import sys
+import tempfile
+
+import pkg_resources
+
+from apache_beam.internal import pickler
+from apache_beam.io.filesystems import FileSystems
+from apache_beam.options.pipeline_options import SetupOptions
+# TODO(angoenka): Remove reference to dataflow internal names
+from apache_beam.runners.dataflow.internal import names
+from apache_beam.utils import processes
+
+# All constants are for internal use only; no backwards-compatibility
+# guarantees.
+
+# Standard file names used for staging files.
+WORKFLOW_TARBALL_FILE = 'workflow.tar.gz'
+REQUIREMENTS_FILE = 'requirements.txt'
+EXTRA_PACKAGES_FILE = 'extra_packages.txt'
+
+# Package names for distributions
+BEAM_PACKAGE_NAME = 'apache-beam'
+
+
+class Stager(object):
+  """Stager identifies and copies the appropriate artifacts to the staging
+  location."""
+
+  def _copy_file(self, from_path, to_path):
+    """Copies a local file to a GCS file or vice versa."""
+    logging.info('file copy from %s to %s.', from_path, to_path)
+    if from_path.startswith('gs://') or to_path.startswith('gs://'):
+      from apache_beam.io.gcp import gcsio
+      if from_path.startswith('gs://') and to_path.startswith('gs://'):
+        # Both files are GCS files so copy.
+        gcsio.GcsIO().copy(from_path, to_path)
+      elif to_path.startswith('gs://'):
+        # Only target is a GCS file, read local file and upload.
+        with open(from_path, 'rb') as f:
+          with gcsio.GcsIO().open(to_path, mode='wb') as g:
+            pfun = functools.partial(f.read, gcsio.WRITE_CHUNK_SIZE)
+            for chunk in iter(pfun, ''):
+              g.write(chunk)
+      else:
+        # Source is a GCS file but target is local file.
+        with gcsio.GcsIO().open(from_path, mode='rb') as g:
+          with open(to_path, 'wb') as f:
+            pfun = functools.partial(g.read, gcsio.DEFAULT_READ_BUFFER_SIZE)
+            for chunk in iter(pfun, ''):
+              f.write(chunk)
+    else:
+      # Branch used only for unit tests and integration tests.
+      # In such environments GCS support is not available.
+      if not os.path.isdir(os.path.dirname(to_path)):
+        logging.info(
+            'Created folder (since we have not done yet, and any errors '
+            'will follow): %s ', os.path.dirname(to_path))
+        os.mkdir(os.path.dirname(to_path))
+      shutil.copyfile(from_path, to_path)
+
+  def _download_file(self, from_url, to_path):
+    """Downloads a file over http/https from a url or copy it from a remote
+        path to local path."""
+    if from_url.startswith('http://') or from_url.startswith('https://'):
+      # TODO(silviuc): We should cache downloads so we do not do it for every
+      # job.
+      try:
+        # We check if the file is actually there because wget returns a file
+        # even for a 404 response (file will contain the contents of the 404
+        # response).
+        # TODO(angoenka): Extract and use the filename when downloading file.
+        response, content = __import__('httplib2').Http().request(from_url)
+        if int(response['status']) >= 400:
+          raise RuntimeError(
+              'Artifact not found at %s (response: %s)' % (from_url, response))
+        with open(to_path, 'w') as f:
+          f.write(content)
+      except Exception:
+        logging.info('Failed to download Artifact from %s', from_url)
+        raise
+    else:
+      # Copy the file from the remote file system to loca files system.
+      self._copy_file(from_url, to_path)
+
+  def _is_remote_path(self, path):
 
 Review comment:
   <!--new_thread; commit:fb22e20e257e1c00a618286cb0a1dd01e4071709; 
resolved:0-->
   Let's make _is_remote_path a plain function in this module, since it does 
not need to access the state of the Stager. We can still monkey-patch it in 
tests if needed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 101958)
    Time Spent: 8h  (was: 7h 50m)

> Python SDK stages artifacts when talking to job server
> ------------------------------------------------------
>
>                 Key: BEAM-3883
>                 URL: https://issues.apache.org/jira/browse/BEAM-3883
>             Project: Beam
>          Issue Type: Sub-task
>          Components: sdk-py-core
>            Reporter: Ben Sidhom
>            Assignee: Ankur Goenka
>            Priority: Major
>          Time Spent: 8h
>  Remaining Estimate: 0h
>
> The Python SDK does not currently stage its user-defined functions or 
> dependencies when talking to the job API. Artifacts that need to be staged 
> include the user code itself, any SDK components not included in the 
> container image, and the list of Python packages that must be installed at 
> runtime.
>  
> Artifacts that are currently expected can be found in the harness boot code: 
> [https://github.com/apache/beam/blob/58e3b06bee7378d2d8db1c8dd534b415864f63e1/sdks/python/container/boot.go#L52.]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to