Repository: beam Updated Branches: refs/heads/master 8ec890909 -> 00b395881
Move dependency file to dataflow runner directory Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/821669da Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/821669da Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/821669da Branch: refs/heads/master Commit: 821669da21d2d785977428e1ecb41c641dda9852 Parents: 8ec8909 Author: Ahmet Altay <al...@google.com> Authored: Wed Feb 22 19:08:16 2017 -0800 Committer: Ahmet Altay <al...@google.com> Committed: Wed Feb 22 19:36:04 2017 -0800 ---------------------------------------------------------------------- .../runners/dataflow/internal/apiclient.py | 6 +- .../runners/dataflow/internal/dependency.py | 508 +++++++++++++++++++ .../dataflow/internal/dependency_test.py | 425 ++++++++++++++++ sdks/python/apache_beam/utils/dependency.py | 504 ------------------ .../python/apache_beam/utils/dependency_test.py | 425 ---------------- sdks/python/apache_beam/utils/profiler.py | 12 +- 6 files changed, 942 insertions(+), 938 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/821669da/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py index 0dab676..481ab70 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py @@ -34,13 +34,13 @@ from apache_beam import utils from apache_beam.internal.auth import get_service_credentials from apache_beam.internal.gcp.json_value import to_json_value from apache_beam.io.gcp.internal.clients import storage +from apache_beam.runners.dataflow.internal import dependency from apache_beam.runners.dataflow.internal.clients import dataflow +from apache_beam.runners.dataflow.internal.dependency import get_required_container_version +from apache_beam.runners.dataflow.internal.dependency import get_sdk_name_and_version from apache_beam.transforms import cy_combiners from apache_beam.transforms.display import DisplayData -from apache_beam.utils import dependency from apache_beam.utils import retry -from apache_beam.utils.dependency import get_required_container_version -from apache_beam.utils.dependency import get_sdk_name_and_version from apache_beam.utils.names import PropertyNames from apache_beam.utils.pipeline_options import DebugOptions from apache_beam.utils.pipeline_options import GoogleCloudOptions http://git-wip-us.apache.org/repos/asf/beam/blob/821669da/sdks/python/apache_beam/runners/dataflow/internal/dependency.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/dataflow/internal/dependency.py b/sdks/python/apache_beam/runners/dataflow/internal/dependency.py new file mode 100644 index 0000000..6024332 --- /dev/null +++ b/sdks/python/apache_beam/runners/dataflow/internal/dependency.py @@ -0,0 +1,508 @@ + +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Support for installing custom code and required dependencies. + +Workflows, with the exception of very simple ones, are organized in multiple +modules and packages. Typically, these modules and packages have +dependencies on other standard libraries. Dataflow relies on the Python +setuptools package to handle these scenarios. For further details please read: +https://pythonhosted.org/an_example_pypi_project/setuptools.html + +When a runner tries to run a pipeline it will check for a --requirements_file +and a --setup_file option. + +If --setup_file is present then it is assumed that the folder containing the +file specified by the option has the typical layout required by setuptools and +it will run 'python setup.py sdist' to produce a source distribution. The +resulting tarball (a .tar or .tar.gz file) will be staged at the GCS staging +location specified as job option. When a worker starts it will check for the +presence of this file and will run 'easy_install tarball' to install the +package in the worker. + +If --requirements_file is present then the file specified by the option will be +staged in the GCS staging location. When a worker starts it will check for the +presence of this file and will run 'pip install -r requirements.txt'. A +requirements file can be easily generated by running 'pip freeze -r +requirements.txt'. The reason a Dataflow runner does not run this automatically +is because quite often only a small fraction of the dependencies present in a +requirements.txt file are actually needed for remote execution and therefore a +one-time manual trimming is desirable. + +TODO(silviuc): Staged files should have a job specific prefix. +To prevent several jobs in the same project stomping on each other due to a +shared staging location. + +TODO(silviuc): Should we allow several setup packages? +TODO(silviuc): We should allow customizing the exact command for setup build. +""" + +import glob +import logging +import os +import re +import shutil +import sys +import tempfile + + +from apache_beam import utils +from apache_beam import version as beam_version +from apache_beam.internal import pickler +from apache_beam.utils import names +from apache_beam.utils import processes +from apache_beam.utils.pipeline_options import GoogleCloudOptions +from apache_beam.utils.pipeline_options import SetupOptions + + +# Update this version to the next version whenever there is a change that will +# require changes to the execution environment. +BEAM_CONTAINER_VERSION = 'beamhead' + +# Standard file names used for staging files. +WORKFLOW_TARBALL_FILE = 'workflow.tar.gz' +REQUIREMENTS_FILE = 'requirements.txt' +EXTRA_PACKAGES_FILE = 'extra_packages.txt' + +GOOGLE_PACKAGE_NAME = 'google-cloud-dataflow' + + +def _dependency_file_copy(from_path, to_path): + """Copies a local file to a GCS file or vice versa.""" + logging.info('file copy from %s to %s.', from_path, to_path) + if from_path.startswith('gs://') or to_path.startswith('gs://'): + command_args = ['gsutil', '-m', '-q', 'cp', from_path, to_path] + logging.info('Executing command: %s', command_args) + processes.check_call(command_args) + else: + # Branch used only for unit tests and integration tests. + # In such environments GCS support is not available. + if not os.path.isdir(os.path.dirname(to_path)): + logging.info('Created folder (since we have not done yet, and any errors ' + 'will follow): %s ', os.path.dirname(to_path)) + os.mkdir(os.path.dirname(to_path)) + shutil.copyfile(from_path, to_path) + + +def _dependency_file_download(from_url, to_folder): + """Downloads a file from a URL and returns path to the local file.""" + # TODO(silviuc): We should cache downloads so we do not do it for every job. + try: + # We check if the file is actually there because wget returns a file + # even for a 404 response (file will contain the contents of the 404 + # response). + response, content = __import__('httplib2').Http().request(from_url) + if int(response['status']) >= 400: + raise RuntimeError( + 'Dataflow SDK not found at %s (response: %s)' % (from_url, response)) + local_download_file = os.path.join(to_folder, 'dataflow-sdk.tar.gz') + with open(local_download_file, 'w') as f: + f.write(content) + except Exception: + logging.info('Failed to download SDK from %s', from_url) + raise + return local_download_file + + +def _stage_extra_packages(extra_packages, staging_location, temp_dir, + file_copy=_dependency_file_copy): + """Stages a list of local extra packages. + + Args: + extra_packages: Ordered list of local paths to extra packages to be staged. + staging_location: Staging location for the packages. + temp_dir: Temporary folder where the resource building can happen. Caller + is responsible for cleaning up this folder after this function returns. + file_copy: Callable for copying files. The default version will copy from + a local file to a GCS location using the gsutil tool available in the + Google Cloud SDK package. + + Returns: + A list of file names (no paths) for the resources staged. All the files + are assumed to be staged in staging_location. + + Raises: + RuntimeError: If files specified are not found or do not have expected + name patterns. + """ + resources = [] + staging_temp_dir = None + local_packages = [] + for package in extra_packages: + if not (os.path.basename(package).endswith('.tar') or + os.path.basename(package).endswith('.tar.gz') or + os.path.basename(package).endswith('.whl')): + raise RuntimeError( + 'The --extra_package option expects a full path ending with ' + '".tar" or ".tar.gz" instead of %s' % package) + if os.path.basename(package).endswith('.whl'): + logging.warning( + 'The .whl package "%s" is provided in --extra_package. ' + 'This functionality is not officially supported. Since wheel ' + 'packages are binary distributions, this package must be ' + 'binary-compatible with the worker environment (e.g. Python 2.7 ' + 'running on an x64 Linux host).') + + if not os.path.isfile(package): + if package.startswith('gs://'): + if not staging_temp_dir: + staging_temp_dir = tempfile.mkdtemp(dir=temp_dir) + logging.info('Downloading extra package: %s locally before staging', + package) + _dependency_file_copy(package, staging_temp_dir) + else: + raise RuntimeError( + 'The file %s cannot be found. It was specified in the ' + '--extra_packages command line option.' % package) + else: + local_packages.append(package) + + if staging_temp_dir: + local_packages.extend( + [utils.path.join(staging_temp_dir, f) for f in os.listdir( + staging_temp_dir)]) + + for package in local_packages: + basename = os.path.basename(package) + staged_path = utils.path.join(staging_location, basename) + file_copy(package, staged_path) + resources.append(basename) + # Create a file containing the list of extra packages and stage it. + # The file is important so that in the worker the packages are installed + # exactly in the order specified. This approach will avoid extra PyPI + # requests. For example if package A depends on package B and package A + # is installed first then the installer will try to satisfy the + # dependency on B by downloading the package from PyPI. If package B is + # installed first this is avoided. + with open(os.path.join(temp_dir, EXTRA_PACKAGES_FILE), 'wt') as f: + for package in local_packages: + f.write('%s\n' % os.path.basename(package)) + staged_path = utils.path.join(staging_location, EXTRA_PACKAGES_FILE) + # Note that the caller of this function is responsible for deleting the + # temporary folder where all temp files are created, including this one. + file_copy(os.path.join(temp_dir, EXTRA_PACKAGES_FILE), staged_path) + resources.append(EXTRA_PACKAGES_FILE) + + return resources + + +def _get_python_executable(): + # Allow overriding the python executable to use for downloading and + # installing dependencies, otherwise use the python executable for + # the current process. + python_bin = os.environ.get('BEAM_PYTHON') or sys.executable + if not python_bin: + raise ValueError('Could not find Python executable.') + return python_bin + + +def _populate_requirements_cache(requirements_file, cache_dir): + # The 'pip download' command will not download again if it finds the + # tarball with the proper version already present. + # It will get the packages downloaded in the order they are presented in + # the requirements file and will not download package dependencies. + cmd_args = [ + _get_python_executable(), '-m', 'pip', 'install', '--download', cache_dir, + '-r', requirements_file, + # Download from PyPI source distributions. + '--no-binary', ':all:'] + logging.info('Executing command: %s', cmd_args) + processes.check_call(cmd_args) + + +def stage_job_resources( + options, file_copy=_dependency_file_copy, build_setup_args=None, + temp_dir=None, populate_requirements_cache=_populate_requirements_cache): + """Creates (if needed) and stages job resources to options.staging_location. + + Args: + options: Command line options. More specifically the function will expect + staging_location, requirements_file, setup_file, and save_main_session + options to be present. + file_copy: Callable for copying files. The default version will copy from + a local file to a GCS location using the gsutil tool available in the + Google Cloud SDK package. + build_setup_args: A list of command line arguments used to build a setup + package. Used only if options.setup_file is not None. Used only for + testing. + temp_dir: Temporary folder where the resource building can happen. If None + then a unique temp directory will be created. Used only for testing. + populate_requirements_cache: Callable for populating the requirements cache. + Used only for testing. + + Returns: + A list of file names (no paths) for the resources staged. All the files + are assumed to be staged in options.staging_location. + + Raises: + RuntimeError: If files specified are not found or error encountered while + trying to create the resources (e.g., build a setup package). + """ + temp_dir = temp_dir or tempfile.mkdtemp() + resources = [] + + google_cloud_options = options.view_as(GoogleCloudOptions) + setup_options = options.view_as(SetupOptions) + # Make sure that all required options are specified. There are a few that have + # defaults to support local running scenarios. + if google_cloud_options.staging_location is None: + raise RuntimeError( + 'The --staging_location option must be specified.') + if google_cloud_options.temp_location is None: + raise RuntimeError( + 'The --temp_location option must be specified.') + + # Stage a requirements file if present. + if setup_options.requirements_file is not None: + if not os.path.isfile(setup_options.requirements_file): + raise RuntimeError('The file %s cannot be found. It was specified in the ' + '--requirements_file command line option.' % + setup_options.requirements_file) + staged_path = utils.path.join(google_cloud_options.staging_location, + REQUIREMENTS_FILE) + file_copy(setup_options.requirements_file, staged_path) + resources.append(REQUIREMENTS_FILE) + requirements_cache_path = ( + os.path.join(tempfile.gettempdir(), 'dataflow-requirements-cache') + if setup_options.requirements_cache is None + else setup_options.requirements_cache) + # Populate cache with packages from requirements and stage the files + # in the cache. + if not os.path.exists(requirements_cache_path): + os.makedirs(requirements_cache_path) + populate_requirements_cache( + setup_options.requirements_file, requirements_cache_path) + for pkg in glob.glob(os.path.join(requirements_cache_path, '*')): + file_copy(pkg, utils.path.join(google_cloud_options.staging_location, + os.path.basename(pkg))) + resources.append(os.path.basename(pkg)) + + # Handle a setup file if present. + # We will build the setup package locally and then copy it to the staging + # location because the staging location is a GCS path and the file cannot be + # created directly there. + if setup_options.setup_file is not None: + if not os.path.isfile(setup_options.setup_file): + raise RuntimeError('The file %s cannot be found. It was specified in the ' + '--setup_file command line option.' % + setup_options.setup_file) + if os.path.basename(setup_options.setup_file) != 'setup.py': + raise RuntimeError( + 'The --setup_file option expects the full path to a file named ' + 'setup.py instead of %s' % setup_options.setup_file) + tarball_file = _build_setup_package(setup_options.setup_file, temp_dir, + build_setup_args) + staged_path = utils.path.join(google_cloud_options.staging_location, + WORKFLOW_TARBALL_FILE) + file_copy(tarball_file, staged_path) + resources.append(WORKFLOW_TARBALL_FILE) + + # Handle extra local packages that should be staged. + if setup_options.extra_packages is not None: + resources.extend( + _stage_extra_packages(setup_options.extra_packages, + google_cloud_options.staging_location, + temp_dir=temp_dir, file_copy=file_copy)) + + # Pickle the main session if requested. + # We will create the pickled main session locally and then copy it to the + # staging location because the staging location is a GCS path and the file + # cannot be created directly there. + if setup_options.save_main_session: + pickled_session_file = os.path.join(temp_dir, + names.PICKLED_MAIN_SESSION_FILE) + pickler.dump_session(pickled_session_file) + staged_path = utils.path.join(google_cloud_options.staging_location, + names.PICKLED_MAIN_SESSION_FILE) + file_copy(pickled_session_file, staged_path) + resources.append(names.PICKLED_MAIN_SESSION_FILE) + + if hasattr(setup_options, 'sdk_location') and setup_options.sdk_location: + if setup_options.sdk_location == 'default': + stage_tarball_from_remote_location = True + elif (setup_options.sdk_location.startswith('gs://') or + setup_options.sdk_location.startswith('http://') or + setup_options.sdk_location.startswith('https://')): + stage_tarball_from_remote_location = True + else: + stage_tarball_from_remote_location = False + + staged_path = utils.path.join(google_cloud_options.staging_location, + names.DATAFLOW_SDK_TARBALL_FILE) + if stage_tarball_from_remote_location: + # If --sdk_location is not specified then the appropriate package + # will be obtained from PyPI (https://pypi.python.org) based on the + # version of the currently running SDK. If the option is + # present then no version matching is made and the exact URL or path + # is expected. + # + # Unit tests running in the 'python setup.py test' context will + # not have the sdk_location attribute present and therefore we + # will not stage a tarball. + if setup_options.sdk_location == 'default': + sdk_remote_location = 'pypi' + else: + sdk_remote_location = setup_options.sdk_location + _stage_dataflow_sdk_tarball(sdk_remote_location, staged_path, temp_dir) + resources.append(names.DATAFLOW_SDK_TARBALL_FILE) + else: + # Check if we have a local Dataflow SDK tarball present. This branch is + # used by tests running with the SDK built at head. + if setup_options.sdk_location == 'default': + module_path = os.path.abspath(__file__) + sdk_path = os.path.join( + os.path.dirname(module_path), '..', names.DATAFLOW_SDK_TARBALL_FILE) + elif os.path.isdir(setup_options.sdk_location): + sdk_path = os.path.join( + setup_options.sdk_location, names.DATAFLOW_SDK_TARBALL_FILE) + else: + sdk_path = setup_options.sdk_location + if os.path.isfile(sdk_path): + logging.info('Copying dataflow SDK "%s" to staging location.', sdk_path) + file_copy(sdk_path, staged_path) + resources.append(names.DATAFLOW_SDK_TARBALL_FILE) + else: + if setup_options.sdk_location == 'default': + raise RuntimeError('Cannot find default Dataflow SDK tar file "%s"', + sdk_path) + else: + raise RuntimeError( + 'The file "%s" cannot be found. Its location was specified by ' + 'the --sdk_location command-line option.' % + sdk_path) + + # Delete all temp files created while staging job resources. + shutil.rmtree(temp_dir) + return resources + + +def _build_setup_package(setup_file, temp_dir, build_setup_args=None): + saved_current_directory = os.getcwd() + try: + os.chdir(os.path.dirname(setup_file)) + if build_setup_args is None: + build_setup_args = [ + _get_python_executable(), os.path.basename(setup_file), + 'sdist', '--dist-dir', temp_dir] + logging.info('Executing command: %s', build_setup_args) + processes.check_call(build_setup_args) + output_files = glob.glob(os.path.join(temp_dir, '*.tar.gz')) + if not output_files: + raise RuntimeError( + 'File %s not found.' % os.path.join(temp_dir, '*.tar.gz')) + return output_files[0] + finally: + os.chdir(saved_current_directory) + + +def _stage_dataflow_sdk_tarball(sdk_remote_location, staged_path, temp_dir): + """Stage a Dataflow SDK tarball with the appropriate version. + + Args: + sdk_remote_location: A GCS path to a Dataflow SDK tarball or a URL from + the file can be downloaded. + staged_path: GCS path where the found SDK tarball should be copied. + temp_dir: path to temporary location where the file should be downloaded. + + Raises: + RuntimeError: If wget on the URL specified returs errors or the file + cannot be copied from/to GCS. + """ + if (sdk_remote_location.startswith('http://') or + sdk_remote_location.startswith('https://')): + logging.info( + 'Staging Dataflow SDK tarball from %s to %s', + sdk_remote_location, staged_path) + local_download_file = _dependency_file_download( + sdk_remote_location, temp_dir) + _dependency_file_copy(local_download_file, staged_path) + elif sdk_remote_location.startswith('gs://'): + # Stage the file to the GCS staging area. + logging.info( + 'Staging Dataflow SDK tarball from %s to %s', + sdk_remote_location, staged_path) + _dependency_file_copy(sdk_remote_location, staged_path) + elif sdk_remote_location == 'pypi': + logging.info('Staging the SDK tarball from PyPI to %s', staged_path) + _dependency_file_copy(_download_pypi_sdk_package(temp_dir), staged_path) + else: + raise RuntimeError( + 'The --sdk_location option was used with an unsupported ' + 'type of location: %s' % sdk_remote_location) + + +def get_required_container_version(): + """Returns the Google Cloud Dataflow container version for remote execution. + """ + # TODO(silviuc): Handle apache-beam versions when we have official releases. + import pkg_resources as pkg + try: + version = pkg.get_distribution(GOOGLE_PACKAGE_NAME).version + # We drop any pre/post parts of the version and we keep only the X.Y.Z + # format. For instance the 0.3.0rc2 SDK version translates into 0.3.0. + container_version = '%s.%s.%s' % pkg.parse_version(version)._version.release + # We do, however, keep the ".dev" suffix if it is present. + if re.match(r'.*\.dev[0-9]*$', version): + container_version += '.dev' + return container_version + except pkg.DistributionNotFound: + # This case covers Apache Beam end-to-end testing scenarios. All these tests + # will run with a special container version. + return BEAM_CONTAINER_VERSION + + +def get_sdk_name_and_version(): + """Returns name and version of SDK reported to Google Cloud Dataflow.""" + # TODO(ccy): Make this check cleaner. + container_version = get_required_container_version() + if container_version == BEAM_CONTAINER_VERSION: + return ('Apache Beam SDK for Python', beam_version.__version__) + else: + return ('Google Cloud Dataflow SDK for Python', container_version) + + +def _download_pypi_sdk_package(temp_dir): + """Downloads SDK package from PyPI and returns path to local path.""" + # TODO(silviuc): Handle apache-beam versions when we have official releases. + import pkg_resources as pkg + try: + version = pkg.get_distribution(GOOGLE_PACKAGE_NAME).version + except pkg.DistributionNotFound: + raise RuntimeError('Please set --sdk_location command-line option ' + 'or install a valid {} distribution.' + .format(GOOGLE_PACKAGE_NAME)) + + # Get a source distribution for the SDK package from PyPI. + cmd_args = [ + _get_python_executable(), '-m', 'pip', 'install', '--download', temp_dir, + '%s==%s' % (GOOGLE_PACKAGE_NAME, version), + '--no-binary', ':all:', '--no-deps'] + logging.info('Executing command: %s', cmd_args) + processes.check_call(cmd_args) + zip_expected = os.path.join( + temp_dir, '%s-%s.zip' % (GOOGLE_PACKAGE_NAME, version)) + if os.path.exists(zip_expected): + return zip_expected + tgz_expected = os.path.join( + temp_dir, '%s-%s.tar.gz' % (GOOGLE_PACKAGE_NAME, version)) + if os.path.exists(tgz_expected): + return tgz_expected + raise RuntimeError( + 'Failed to download a source distribution for the running SDK. Expected ' + 'either %s or %s to be found in the download folder.' % ( + zip_expected, tgz_expected)) http://git-wip-us.apache.org/repos/asf/beam/blob/821669da/sdks/python/apache_beam/runners/dataflow/internal/dependency_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/dataflow/internal/dependency_test.py b/sdks/python/apache_beam/runners/dataflow/internal/dependency_test.py new file mode 100644 index 0000000..0657a07 --- /dev/null +++ b/sdks/python/apache_beam/runners/dataflow/internal/dependency_test.py @@ -0,0 +1,425 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Unit tests for the setup module.""" + +import logging +import os +import shutil +import tempfile +import unittest + +from apache_beam import utils +from apache_beam.runners.dataflow.internal import dependency +from apache_beam.utils import names +from apache_beam.utils.pipeline_options import GoogleCloudOptions +from apache_beam.utils.pipeline_options import PipelineOptions +from apache_beam.utils.pipeline_options import SetupOptions + + +class SetupTest(unittest.TestCase): + + def update_options(self, options): + setup_options = options.view_as(SetupOptions) + setup_options.sdk_location = '' + google_cloud_options = options.view_as(GoogleCloudOptions) + if google_cloud_options.temp_location is None: + google_cloud_options.temp_location = google_cloud_options.staging_location + + def create_temp_file(self, path, contents): + with open(path, 'w') as f: + f.write(contents) + return f.name + + def populate_requirements_cache(self, requirements_file, cache_dir): + _ = requirements_file + self.create_temp_file(os.path.join(cache_dir, 'abc.txt'), 'nothing') + self.create_temp_file(os.path.join(cache_dir, 'def.txt'), 'nothing') + + def test_no_staging_location(self): + with self.assertRaises(RuntimeError) as cm: + dependency.stage_job_resources(PipelineOptions()) + self.assertEqual('The --staging_location option must be specified.', + cm.exception.message) + + def test_no_temp_location(self): + staging_dir = tempfile.mkdtemp() + options = PipelineOptions() + google_cloud_options = options.view_as(GoogleCloudOptions) + google_cloud_options.staging_location = staging_dir + self.update_options(options) + google_cloud_options.temp_location = None + with self.assertRaises(RuntimeError) as cm: + dependency.stage_job_resources(options) + self.assertEqual('The --temp_location option must be specified.', + cm.exception.message) + + def test_no_main_session(self): + staging_dir = tempfile.mkdtemp() + options = PipelineOptions() + + options.view_as(GoogleCloudOptions).staging_location = staging_dir + options.view_as(SetupOptions).save_main_session = False + self.update_options(options) + + self.assertEqual( + [], + dependency.stage_job_resources(options)) + + def test_with_main_session(self): + staging_dir = tempfile.mkdtemp() + options = PipelineOptions() + + options.view_as(GoogleCloudOptions).staging_location = staging_dir + options.view_as(SetupOptions).save_main_session = True + self.update_options(options) + + self.assertEqual( + [names.PICKLED_MAIN_SESSION_FILE], + dependency.stage_job_resources(options)) + self.assertTrue( + os.path.isfile( + os.path.join(staging_dir, names.PICKLED_MAIN_SESSION_FILE))) + + def test_default_resources(self): + staging_dir = tempfile.mkdtemp() + options = PipelineOptions() + options.view_as(GoogleCloudOptions).staging_location = staging_dir + self.update_options(options) + + self.assertEqual( + [], + dependency.stage_job_resources(options)) + + def test_with_requirements_file(self): + try: + staging_dir = tempfile.mkdtemp() + requirements_cache_dir = tempfile.mkdtemp() + source_dir = tempfile.mkdtemp() + + options = PipelineOptions() + options.view_as(GoogleCloudOptions).staging_location = staging_dir + self.update_options(options) + options.view_as(SetupOptions).requirements_cache = requirements_cache_dir + options.view_as(SetupOptions).requirements_file = os.path.join( + source_dir, dependency.REQUIREMENTS_FILE) + self.create_temp_file( + os.path.join(source_dir, dependency.REQUIREMENTS_FILE), 'nothing') + self.assertEqual( + sorted([dependency.REQUIREMENTS_FILE, + 'abc.txt', 'def.txt']), + sorted(dependency.stage_job_resources( + options, + populate_requirements_cache=self.populate_requirements_cache))) + self.assertTrue( + os.path.isfile( + os.path.join(staging_dir, dependency.REQUIREMENTS_FILE))) + self.assertTrue(os.path.isfile(os.path.join(staging_dir, 'abc.txt'))) + self.assertTrue(os.path.isfile(os.path.join(staging_dir, 'def.txt'))) + finally: + shutil.rmtree(staging_dir) + shutil.rmtree(requirements_cache_dir) + shutil.rmtree(source_dir) + + def test_requirements_file_not_present(self): + staging_dir = tempfile.mkdtemp() + with self.assertRaises(RuntimeError) as cm: + options = PipelineOptions() + options.view_as(GoogleCloudOptions).staging_location = staging_dir + self.update_options(options) + options.view_as(SetupOptions).requirements_file = 'nosuchfile' + dependency.stage_job_resources( + options, populate_requirements_cache=self.populate_requirements_cache) + self.assertEqual( + cm.exception.message, + 'The file %s cannot be found. It was specified in the ' + '--requirements_file command line option.' % 'nosuchfile') + + def test_with_requirements_file_and_cache(self): + staging_dir = tempfile.mkdtemp() + source_dir = tempfile.mkdtemp() + + options = PipelineOptions() + options.view_as(GoogleCloudOptions).staging_location = staging_dir + self.update_options(options) + options.view_as(SetupOptions).requirements_file = os.path.join( + source_dir, dependency.REQUIREMENTS_FILE) + options.view_as(SetupOptions).requirements_cache = os.path.join( + tempfile.gettempdir(), 'alternative-cache-dir') + self.create_temp_file( + os.path.join(source_dir, dependency.REQUIREMENTS_FILE), 'nothing') + self.assertEqual( + sorted([dependency.REQUIREMENTS_FILE, + 'abc.txt', 'def.txt']), + sorted(dependency.stage_job_resources( + options, + populate_requirements_cache=self.populate_requirements_cache))) + self.assertTrue( + os.path.isfile( + os.path.join(staging_dir, dependency.REQUIREMENTS_FILE))) + self.assertTrue(os.path.isfile(os.path.join(staging_dir, 'abc.txt'))) + self.assertTrue(os.path.isfile(os.path.join(staging_dir, 'def.txt'))) + + def test_with_setup_file(self): + staging_dir = tempfile.mkdtemp() + source_dir = tempfile.mkdtemp() + self.create_temp_file( + os.path.join(source_dir, 'setup.py'), 'notused') + + options = PipelineOptions() + options.view_as(GoogleCloudOptions).staging_location = staging_dir + self.update_options(options) + options.view_as(SetupOptions).setup_file = os.path.join( + source_dir, 'setup.py') + + self.assertEqual( + [dependency.WORKFLOW_TARBALL_FILE], + dependency.stage_job_resources( + options, + # We replace the build setup command because a realistic one would + # require the setuptools package to be installed. Note that we can't + # use "touch" here to create the expected output tarball file, since + # touch is not available on Windows, so we invoke python to produce + # equivalent behavior. + build_setup_args=[ + 'python', '-c', 'open(__import__("sys").argv[1], "a")', + os.path.join(source_dir, dependency.WORKFLOW_TARBALL_FILE)], + temp_dir=source_dir)) + self.assertTrue( + os.path.isfile( + os.path.join(staging_dir, dependency.WORKFLOW_TARBALL_FILE))) + + def test_setup_file_not_present(self): + staging_dir = tempfile.mkdtemp() + + options = PipelineOptions() + options.view_as(GoogleCloudOptions).staging_location = staging_dir + self.update_options(options) + options.view_as(SetupOptions).setup_file = 'nosuchfile' + + with self.assertRaises(RuntimeError) as cm: + dependency.stage_job_resources(options) + self.assertEqual( + cm.exception.message, + 'The file %s cannot be found. It was specified in the ' + '--setup_file command line option.' % 'nosuchfile') + + def test_setup_file_not_named_setup_dot_py(self): + staging_dir = tempfile.mkdtemp() + source_dir = tempfile.mkdtemp() + + options = PipelineOptions() + options.view_as(GoogleCloudOptions).staging_location = staging_dir + self.update_options(options) + options.view_as(SetupOptions).setup_file = ( + os.path.join(source_dir, 'xyz-setup.py')) + + self.create_temp_file( + os.path.join(source_dir, 'xyz-setup.py'), 'notused') + with self.assertRaises(RuntimeError) as cm: + dependency.stage_job_resources(options) + self.assertTrue( + cm.exception.message.startswith( + 'The --setup_file option expects the full path to a file named ' + 'setup.py instead of ')) + + def override_file_copy(self, expected_from_path, expected_to_dir): + def file_copy(from_path, to_path): + if not from_path.endswith(names.PICKLED_MAIN_SESSION_FILE): + self.assertEqual(expected_from_path, from_path) + self.assertEqual(utils.path.join(expected_to_dir, + names.DATAFLOW_SDK_TARBALL_FILE), + to_path) + if from_path.startswith('gs://') or to_path.startswith('gs://'): + logging.info('Faking file_copy(%s, %s)', from_path, to_path) + else: + shutil.copyfile(from_path, to_path) + dependency._dependency_file_copy = file_copy + + def override_file_download(self, expected_from_url, expected_to_folder): + def file_download(from_url, _): + self.assertEqual(expected_from_url, from_url) + tarball_path = os.path.join(expected_to_folder, 'sdk-tarball') + with open(tarball_path, 'w') as f: + f.write('Some contents.') + return tarball_path + dependency._dependency_file_download = file_download + return os.path.join(expected_to_folder, 'sdk-tarball') + + def override_pypi_download(self, expected_from_url, expected_to_folder): + def pypi_download(_): + tarball_path = os.path.join(expected_to_folder, 'sdk-tarball') + with open(tarball_path, 'w') as f: + f.write('Some contents.') + return tarball_path + dependency._download_pypi_sdk_package = pypi_download + return os.path.join(expected_to_folder, 'sdk-tarball') + + def test_sdk_location_default(self): + staging_dir = tempfile.mkdtemp() + expected_from_url = 'pypi' + expected_from_path = self.override_pypi_download( + expected_from_url, staging_dir) + self.override_file_copy(expected_from_path, staging_dir) + + options = PipelineOptions() + options.view_as(GoogleCloudOptions).staging_location = staging_dir + self.update_options(options) + options.view_as(SetupOptions).sdk_location = 'default' + + self.assertEqual( + [names.DATAFLOW_SDK_TARBALL_FILE], + dependency.stage_job_resources( + options, + file_copy=dependency._dependency_file_copy)) + + def test_sdk_location_local(self): + staging_dir = tempfile.mkdtemp() + sdk_location = tempfile.mkdtemp() + self.create_temp_file( + os.path.join( + sdk_location, + names.DATAFLOW_SDK_TARBALL_FILE), + 'contents') + + options = PipelineOptions() + options.view_as(GoogleCloudOptions).staging_location = staging_dir + self.update_options(options) + options.view_as(SetupOptions).sdk_location = sdk_location + + self.assertEqual( + [names.DATAFLOW_SDK_TARBALL_FILE], + dependency.stage_job_resources(options)) + tarball_path = os.path.join( + staging_dir, names.DATAFLOW_SDK_TARBALL_FILE) + with open(tarball_path) as f: + self.assertEqual(f.read(), 'contents') + + def test_sdk_location_local_not_present(self): + staging_dir = tempfile.mkdtemp() + sdk_location = 'nosuchdir' + with self.assertRaises(RuntimeError) as cm: + options = PipelineOptions() + options.view_as(GoogleCloudOptions).staging_location = staging_dir + self.update_options(options) + options.view_as(SetupOptions).sdk_location = sdk_location + + dependency.stage_job_resources(options) + self.assertEqual( + 'The file "%s" cannot be found. Its ' + 'location was specified by the --sdk_location command-line option.' % + sdk_location, + cm.exception.message) + + def test_sdk_location_gcs(self): + staging_dir = tempfile.mkdtemp() + sdk_location = 'gs://my-gcs-bucket/tarball.tar.gz' + self.override_file_copy(sdk_location, staging_dir) + + options = PipelineOptions() + options.view_as(GoogleCloudOptions).staging_location = staging_dir + self.update_options(options) + options.view_as(SetupOptions).sdk_location = sdk_location + + self.assertEqual( + [names.DATAFLOW_SDK_TARBALL_FILE], + dependency.stage_job_resources(options)) + + def test_with_extra_packages(self): + staging_dir = tempfile.mkdtemp() + source_dir = tempfile.mkdtemp() + self.create_temp_file( + os.path.join(source_dir, 'abc.tar.gz'), 'nothing') + self.create_temp_file( + os.path.join(source_dir, 'xyz.tar.gz'), 'nothing') + self.create_temp_file( + os.path.join(source_dir, 'xyz2.tar'), 'nothing') + self.create_temp_file( + os.path.join(source_dir, 'whl.whl'), 'nothing') + self.create_temp_file( + os.path.join(source_dir, dependency.EXTRA_PACKAGES_FILE), 'nothing') + + options = PipelineOptions() + options.view_as(GoogleCloudOptions).staging_location = staging_dir + self.update_options(options) + options.view_as(SetupOptions).extra_packages = [ + os.path.join(source_dir, 'abc.tar.gz'), + os.path.join(source_dir, 'xyz.tar.gz'), + os.path.join(source_dir, 'xyz2.tar'), + os.path.join(source_dir, 'whl.whl'), + 'gs://my-gcs-bucket/gcs.tar.gz'] + + gcs_copied_files = [] + + def file_copy(from_path, to_path): + if from_path.startswith('gs://'): + gcs_copied_files.append(from_path) + _, from_name = os.path.split(from_path) + self.create_temp_file(os.path.join(to_path, from_name), 'nothing') + logging.info('Fake copied GCS file: %s to %s', from_path, to_path) + elif to_path.startswith('gs://'): + logging.info('Faking file_copy(%s, %s)', from_path, to_path) + else: + shutil.copyfile(from_path, to_path) + + dependency._dependency_file_copy = file_copy + + self.assertEqual( + ['abc.tar.gz', 'xyz.tar.gz', 'xyz2.tar', 'whl.whl', 'gcs.tar.gz', + dependency.EXTRA_PACKAGES_FILE], + dependency.stage_job_resources(options)) + with open(os.path.join(staging_dir, dependency.EXTRA_PACKAGES_FILE)) as f: + self.assertEqual(['abc.tar.gz\n', 'xyz.tar.gz\n', 'xyz2.tar\n', + 'whl.whl\n', 'gcs.tar.gz\n'], f.readlines()) + self.assertEqual(['gs://my-gcs-bucket/gcs.tar.gz'], gcs_copied_files) + + def test_with_extra_packages_missing_files(self): + staging_dir = tempfile.mkdtemp() + with self.assertRaises(RuntimeError) as cm: + + options = PipelineOptions() + options.view_as(GoogleCloudOptions).staging_location = staging_dir + self.update_options(options) + options.view_as(SetupOptions).extra_packages = ['nosuchfile.tar.gz'] + + dependency.stage_job_resources(options) + self.assertEqual( + cm.exception.message, + 'The file %s cannot be found. It was specified in the ' + '--extra_packages command line option.' % 'nosuchfile.tar.gz') + + def test_with_extra_packages_invalid_file_name(self): + staging_dir = tempfile.mkdtemp() + source_dir = tempfile.mkdtemp() + self.create_temp_file( + os.path.join(source_dir, 'abc.tgz'), 'nothing') + with self.assertRaises(RuntimeError) as cm: + options = PipelineOptions() + options.view_as(GoogleCloudOptions).staging_location = staging_dir + self.update_options(options) + options.view_as(SetupOptions).extra_packages = [ + os.path.join(source_dir, 'abc.tgz')] + dependency.stage_job_resources(options) + self.assertEqual( + cm.exception.message, + 'The --extra_package option expects a full path ending with ".tar" or ' + '".tar.gz" instead of %s' % os.path.join(source_dir, 'abc.tgz')) + + +if __name__ == '__main__': + logging.getLogger().setLevel(logging.INFO) + unittest.main() http://git-wip-us.apache.org/repos/asf/beam/blob/821669da/sdks/python/apache_beam/utils/dependency.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/utils/dependency.py b/sdks/python/apache_beam/utils/dependency.py deleted file mode 100644 index 11a2e1c..0000000 --- a/sdks/python/apache_beam/utils/dependency.py +++ /dev/null @@ -1,504 +0,0 @@ - -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -"""Support for installing custom code and required dependencies. - -Workflows, with the exception of very simple ones, are organized in multiple -modules and packages. Typically, these modules and packages have -dependencies on other standard libraries. Dataflow relies on the Python -setuptools package to handle these scenarios. For further details please read: -https://pythonhosted.org/an_example_pypi_project/setuptools.html - -When a runner tries to run a pipeline it will check for a --requirements_file -and a --setup_file option. - -If --setup_file is present then it is assumed that the folder containing the -file specified by the option has the typical layout required by setuptools and -it will run 'python setup.py sdist' to produce a source distribution. The -resulting tarball (a .tar or .tar.gz file) will be staged at the GCS staging -location specified as job option. When a worker starts it will check for the -presence of this file and will run 'easy_install tarball' to install the -package in the worker. - -If --requirements_file is present then the file specified by the option will be -staged in the GCS staging location. When a worker starts it will check for the -presence of this file and will run 'pip install -r requirements.txt'. A -requirements file can be easily generated by running 'pip freeze -r -requirements.txt'. The reason a Dataflow runner does not run this automatically -is because quite often only a small fraction of the dependencies present in a -requirements.txt file are actually needed for remote execution and therefore a -one-time manual trimming is desirable. - -TODO(silviuc): Staged files should have a job specific prefix. -To prevent several jobs in the same project stomping on each other due to a -shared staging location. - -TODO(silviuc): Should we allow several setup packages? -TODO(silviuc): We should allow customizing the exact command for setup build. -""" - -import glob -import logging -import os -import re -import shutil -import sys -import tempfile - - -from apache_beam import utils -from apache_beam import version as beam_version -from apache_beam.internal import pickler -from apache_beam.utils import names -from apache_beam.utils import processes -from apache_beam.utils.pipeline_options import GoogleCloudOptions -from apache_beam.utils.pipeline_options import SetupOptions - - -# Standard file names used for staging files. -WORKFLOW_TARBALL_FILE = 'workflow.tar.gz' -REQUIREMENTS_FILE = 'requirements.txt' -EXTRA_PACKAGES_FILE = 'extra_packages.txt' - -GOOGLE_PACKAGE_NAME = 'google-cloud-dataflow' - - -def _dependency_file_copy(from_path, to_path): - """Copies a local file to a GCS file or vice versa.""" - logging.info('file copy from %s to %s.', from_path, to_path) - if from_path.startswith('gs://') or to_path.startswith('gs://'): - command_args = ['gsutil', '-m', '-q', 'cp', from_path, to_path] - logging.info('Executing command: %s', command_args) - processes.check_call(command_args) - else: - # Branch used only for unit tests and integration tests. - # In such environments GCS support is not available. - if not os.path.isdir(os.path.dirname(to_path)): - logging.info('Created folder (since we have not done yet, and any errors ' - 'will follow): %s ', os.path.dirname(to_path)) - os.mkdir(os.path.dirname(to_path)) - shutil.copyfile(from_path, to_path) - - -def _dependency_file_download(from_url, to_folder): - """Downloads a file from a URL and returns path to the local file.""" - # TODO(silviuc): We should cache downloads so we do not do it for every job. - try: - # We check if the file is actually there because wget returns a file - # even for a 404 response (file will contain the contents of the 404 - # response). - response, content = __import__('httplib2').Http().request(from_url) - if int(response['status']) >= 400: - raise RuntimeError( - 'Dataflow SDK not found at %s (response: %s)' % (from_url, response)) - local_download_file = os.path.join(to_folder, 'dataflow-sdk.tar.gz') - with open(local_download_file, 'w') as f: - f.write(content) - except Exception: - logging.info('Failed to download SDK from %s', from_url) - raise - return local_download_file - - -def _stage_extra_packages(extra_packages, staging_location, temp_dir, - file_copy=_dependency_file_copy): - """Stages a list of local extra packages. - - Args: - extra_packages: Ordered list of local paths to extra packages to be staged. - staging_location: Staging location for the packages. - temp_dir: Temporary folder where the resource building can happen. Caller - is responsible for cleaning up this folder after this function returns. - file_copy: Callable for copying files. The default version will copy from - a local file to a GCS location using the gsutil tool available in the - Google Cloud SDK package. - - Returns: - A list of file names (no paths) for the resources staged. All the files - are assumed to be staged in staging_location. - - Raises: - RuntimeError: If files specified are not found or do not have expected - name patterns. - """ - resources = [] - staging_temp_dir = None - local_packages = [] - for package in extra_packages: - if not (os.path.basename(package).endswith('.tar') or - os.path.basename(package).endswith('.tar.gz') or - os.path.basename(package).endswith('.whl')): - raise RuntimeError( - 'The --extra_package option expects a full path ending with ' - '".tar" or ".tar.gz" instead of %s' % package) - if os.path.basename(package).endswith('.whl'): - logging.warning( - 'The .whl package "%s" is provided in --extra_package. ' - 'This functionality is not officially supported. Since wheel ' - 'packages are binary distributions, this package must be ' - 'binary-compatible with the worker environment (e.g. Python 2.7 ' - 'running on an x64 Linux host).') - - if not os.path.isfile(package): - if package.startswith('gs://'): - if not staging_temp_dir: - staging_temp_dir = tempfile.mkdtemp(dir=temp_dir) - logging.info('Downloading extra package: %s locally before staging', - package) - _dependency_file_copy(package, staging_temp_dir) - else: - raise RuntimeError( - 'The file %s cannot be found. It was specified in the ' - '--extra_packages command line option.' % package) - else: - local_packages.append(package) - - if staging_temp_dir: - local_packages.extend( - [utils.path.join(staging_temp_dir, f) for f in os.listdir( - staging_temp_dir)]) - - for package in local_packages: - basename = os.path.basename(package) - staged_path = utils.path.join(staging_location, basename) - file_copy(package, staged_path) - resources.append(basename) - # Create a file containing the list of extra packages and stage it. - # The file is important so that in the worker the packages are installed - # exactly in the order specified. This approach will avoid extra PyPI - # requests. For example if package A depends on package B and package A - # is installed first then the installer will try to satisfy the - # dependency on B by downloading the package from PyPI. If package B is - # installed first this is avoided. - with open(os.path.join(temp_dir, EXTRA_PACKAGES_FILE), 'wt') as f: - for package in local_packages: - f.write('%s\n' % os.path.basename(package)) - staged_path = utils.path.join(staging_location, EXTRA_PACKAGES_FILE) - # Note that the caller of this function is responsible for deleting the - # temporary folder where all temp files are created, including this one. - file_copy(os.path.join(temp_dir, EXTRA_PACKAGES_FILE), staged_path) - resources.append(EXTRA_PACKAGES_FILE) - - return resources - - -def _get_python_executable(): - # Allow overriding the python executable to use for downloading and - # installing dependencies, otherwise use the python executable for - # the current process. - python_bin = os.environ.get('BEAM_PYTHON') or sys.executable - if not python_bin: - raise ValueError('Could not find Python executable.') - return python_bin - - -def _populate_requirements_cache(requirements_file, cache_dir): - # The 'pip download' command will not download again if it finds the - # tarball with the proper version already present. - # It will get the packages downloaded in the order they are presented in - # the requirements file and will not download package dependencies. - cmd_args = [ - _get_python_executable(), '-m', 'pip', 'install', '--download', cache_dir, - '-r', requirements_file, - # Download from PyPI source distributions. - '--no-binary', ':all:'] - logging.info('Executing command: %s', cmd_args) - processes.check_call(cmd_args) - - -def stage_job_resources( - options, file_copy=_dependency_file_copy, build_setup_args=None, - temp_dir=None, populate_requirements_cache=_populate_requirements_cache): - """Creates (if needed) and stages job resources to options.staging_location. - - Args: - options: Command line options. More specifically the function will expect - staging_location, requirements_file, setup_file, and save_main_session - options to be present. - file_copy: Callable for copying files. The default version will copy from - a local file to a GCS location using the gsutil tool available in the - Google Cloud SDK package. - build_setup_args: A list of command line arguments used to build a setup - package. Used only if options.setup_file is not None. Used only for - testing. - temp_dir: Temporary folder where the resource building can happen. If None - then a unique temp directory will be created. Used only for testing. - populate_requirements_cache: Callable for populating the requirements cache. - Used only for testing. - - Returns: - A list of file names (no paths) for the resources staged. All the files - are assumed to be staged in options.staging_location. - - Raises: - RuntimeError: If files specified are not found or error encountered while - trying to create the resources (e.g., build a setup package). - """ - temp_dir = temp_dir or tempfile.mkdtemp() - resources = [] - - google_cloud_options = options.view_as(GoogleCloudOptions) - setup_options = options.view_as(SetupOptions) - # Make sure that all required options are specified. There are a few that have - # defaults to support local running scenarios. - if google_cloud_options.staging_location is None: - raise RuntimeError( - 'The --staging_location option must be specified.') - if google_cloud_options.temp_location is None: - raise RuntimeError( - 'The --temp_location option must be specified.') - - # Stage a requirements file if present. - if setup_options.requirements_file is not None: - if not os.path.isfile(setup_options.requirements_file): - raise RuntimeError('The file %s cannot be found. It was specified in the ' - '--requirements_file command line option.' % - setup_options.requirements_file) - staged_path = utils.path.join(google_cloud_options.staging_location, - REQUIREMENTS_FILE) - file_copy(setup_options.requirements_file, staged_path) - resources.append(REQUIREMENTS_FILE) - requirements_cache_path = ( - os.path.join(tempfile.gettempdir(), 'dataflow-requirements-cache') - if setup_options.requirements_cache is None - else setup_options.requirements_cache) - # Populate cache with packages from requirements and stage the files - # in the cache. - if not os.path.exists(requirements_cache_path): - os.makedirs(requirements_cache_path) - populate_requirements_cache( - setup_options.requirements_file, requirements_cache_path) - for pkg in glob.glob(os.path.join(requirements_cache_path, '*')): - file_copy(pkg, utils.path.join(google_cloud_options.staging_location, - os.path.basename(pkg))) - resources.append(os.path.basename(pkg)) - - # Handle a setup file if present. - # We will build the setup package locally and then copy it to the staging - # location because the staging location is a GCS path and the file cannot be - # created directly there. - if setup_options.setup_file is not None: - if not os.path.isfile(setup_options.setup_file): - raise RuntimeError('The file %s cannot be found. It was specified in the ' - '--setup_file command line option.' % - setup_options.setup_file) - if os.path.basename(setup_options.setup_file) != 'setup.py': - raise RuntimeError( - 'The --setup_file option expects the full path to a file named ' - 'setup.py instead of %s' % setup_options.setup_file) - tarball_file = _build_setup_package(setup_options.setup_file, temp_dir, - build_setup_args) - staged_path = utils.path.join(google_cloud_options.staging_location, - WORKFLOW_TARBALL_FILE) - file_copy(tarball_file, staged_path) - resources.append(WORKFLOW_TARBALL_FILE) - - # Handle extra local packages that should be staged. - if setup_options.extra_packages is not None: - resources.extend( - _stage_extra_packages(setup_options.extra_packages, - google_cloud_options.staging_location, - temp_dir=temp_dir, file_copy=file_copy)) - - # Pickle the main session if requested. - # We will create the pickled main session locally and then copy it to the - # staging location because the staging location is a GCS path and the file - # cannot be created directly there. - if setup_options.save_main_session: - pickled_session_file = os.path.join(temp_dir, - names.PICKLED_MAIN_SESSION_FILE) - pickler.dump_session(pickled_session_file) - staged_path = utils.path.join(google_cloud_options.staging_location, - names.PICKLED_MAIN_SESSION_FILE) - file_copy(pickled_session_file, staged_path) - resources.append(names.PICKLED_MAIN_SESSION_FILE) - - if hasattr(setup_options, 'sdk_location') and setup_options.sdk_location: - if setup_options.sdk_location == 'default': - stage_tarball_from_remote_location = True - elif (setup_options.sdk_location.startswith('gs://') or - setup_options.sdk_location.startswith('http://') or - setup_options.sdk_location.startswith('https://')): - stage_tarball_from_remote_location = True - else: - stage_tarball_from_remote_location = False - - staged_path = utils.path.join(google_cloud_options.staging_location, - names.DATAFLOW_SDK_TARBALL_FILE) - if stage_tarball_from_remote_location: - # If --sdk_location is not specified then the appropriate package - # will be obtained from PyPI (https://pypi.python.org) based on the - # version of the currently running SDK. If the option is - # present then no version matching is made and the exact URL or path - # is expected. - # - # Unit tests running in the 'python setup.py test' context will - # not have the sdk_location attribute present and therefore we - # will not stage a tarball. - if setup_options.sdk_location == 'default': - sdk_remote_location = 'pypi' - else: - sdk_remote_location = setup_options.sdk_location - _stage_dataflow_sdk_tarball(sdk_remote_location, staged_path, temp_dir) - resources.append(names.DATAFLOW_SDK_TARBALL_FILE) - else: - # Check if we have a local Dataflow SDK tarball present. This branch is - # used by tests running with the SDK built at head. - if setup_options.sdk_location == 'default': - module_path = os.path.abspath(__file__) - sdk_path = os.path.join( - os.path.dirname(module_path), '..', names.DATAFLOW_SDK_TARBALL_FILE) - elif os.path.isdir(setup_options.sdk_location): - sdk_path = os.path.join( - setup_options.sdk_location, names.DATAFLOW_SDK_TARBALL_FILE) - else: - sdk_path = setup_options.sdk_location - if os.path.isfile(sdk_path): - logging.info('Copying dataflow SDK "%s" to staging location.', sdk_path) - file_copy(sdk_path, staged_path) - resources.append(names.DATAFLOW_SDK_TARBALL_FILE) - else: - if setup_options.sdk_location == 'default': - raise RuntimeError('Cannot find default Dataflow SDK tar file "%s"', - sdk_path) - else: - raise RuntimeError( - 'The file "%s" cannot be found. Its location was specified by ' - 'the --sdk_location command-line option.' % - sdk_path) - - # Delete all temp files created while staging job resources. - shutil.rmtree(temp_dir) - return resources - - -def _build_setup_package(setup_file, temp_dir, build_setup_args=None): - saved_current_directory = os.getcwd() - try: - os.chdir(os.path.dirname(setup_file)) - if build_setup_args is None: - build_setup_args = [ - _get_python_executable(), os.path.basename(setup_file), - 'sdist', '--dist-dir', temp_dir] - logging.info('Executing command: %s', build_setup_args) - processes.check_call(build_setup_args) - output_files = glob.glob(os.path.join(temp_dir, '*.tar.gz')) - if not output_files: - raise RuntimeError( - 'File %s not found.' % os.path.join(temp_dir, '*.tar.gz')) - return output_files[0] - finally: - os.chdir(saved_current_directory) - - -def _stage_dataflow_sdk_tarball(sdk_remote_location, staged_path, temp_dir): - """Stage a Dataflow SDK tarball with the appropriate version. - - Args: - sdk_remote_location: A GCS path to a Dataflow SDK tarball or a URL from - the file can be downloaded. - staged_path: GCS path where the found SDK tarball should be copied. - temp_dir: path to temporary location where the file should be downloaded. - - Raises: - RuntimeError: If wget on the URL specified returs errors or the file - cannot be copied from/to GCS. - """ - if (sdk_remote_location.startswith('http://') or - sdk_remote_location.startswith('https://')): - logging.info( - 'Staging Dataflow SDK tarball from %s to %s', - sdk_remote_location, staged_path) - local_download_file = _dependency_file_download( - sdk_remote_location, temp_dir) - _dependency_file_copy(local_download_file, staged_path) - elif sdk_remote_location.startswith('gs://'): - # Stage the file to the GCS staging area. - logging.info( - 'Staging Dataflow SDK tarball from %s to %s', - sdk_remote_location, staged_path) - _dependency_file_copy(sdk_remote_location, staged_path) - elif sdk_remote_location == 'pypi': - logging.info('Staging the SDK tarball from PyPI to %s', staged_path) - _dependency_file_copy(_download_pypi_sdk_package(temp_dir), staged_path) - else: - raise RuntimeError( - 'The --sdk_location option was used with an unsupported ' - 'type of location: %s' % sdk_remote_location) - - -def get_required_container_version(): - """Returns the Google Cloud Dataflow container version for remote execution. - """ - # TODO(silviuc): Handle apache-beam versions when we have official releases. - import pkg_resources as pkg - try: - version = pkg.get_distribution(GOOGLE_PACKAGE_NAME).version - # We drop any pre/post parts of the version and we keep only the X.Y.Z - # format. For instance the 0.3.0rc2 SDK version translates into 0.3.0. - container_version = '%s.%s.%s' % pkg.parse_version(version)._version.release - # We do, however, keep the ".dev" suffix if it is present. - if re.match(r'.*\.dev[0-9]*$', version): - container_version += '.dev' - return container_version - except pkg.DistributionNotFound: - # This case covers Apache Beam end-to-end testing scenarios. All these tests - # will run with a special container version. - return 'beamhead' - - -def get_sdk_name_and_version(): - """Returns name and version of SDK reported to Google Cloud Dataflow.""" - # TODO(ccy): Make this check cleaner. - container_version = get_required_container_version() - if container_version == 'beamhead': - return ('Apache Beam SDK for Python', beam_version.__version__) - else: - return ('Google Cloud Dataflow SDK for Python', container_version) - - -def _download_pypi_sdk_package(temp_dir): - """Downloads SDK package from PyPI and returns path to local path.""" - # TODO(silviuc): Handle apache-beam versions when we have official releases. - import pkg_resources as pkg - try: - version = pkg.get_distribution(GOOGLE_PACKAGE_NAME).version - except pkg.DistributionNotFound: - raise RuntimeError('Please set --sdk_location command-line option ' - 'or install a valid {} distribution.' - .format(GOOGLE_PACKAGE_NAME)) - - # Get a source distribution for the SDK package from PyPI. - cmd_args = [ - _get_python_executable(), '-m', 'pip', 'install', '--download', temp_dir, - '%s==%s' % (GOOGLE_PACKAGE_NAME, version), - '--no-binary', ':all:', '--no-deps'] - logging.info('Executing command: %s', cmd_args) - processes.check_call(cmd_args) - zip_expected = os.path.join( - temp_dir, '%s-%s.zip' % (GOOGLE_PACKAGE_NAME, version)) - if os.path.exists(zip_expected): - return zip_expected - tgz_expected = os.path.join( - temp_dir, '%s-%s.tar.gz' % (GOOGLE_PACKAGE_NAME, version)) - if os.path.exists(tgz_expected): - return tgz_expected - raise RuntimeError( - 'Failed to download a source distribution for the running SDK. Expected ' - 'either %s or %s to be found in the download folder.' % ( - zip_expected, tgz_expected)) http://git-wip-us.apache.org/repos/asf/beam/blob/821669da/sdks/python/apache_beam/utils/dependency_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/utils/dependency_test.py b/sdks/python/apache_beam/utils/dependency_test.py deleted file mode 100644 index 75a89e2..0000000 --- a/sdks/python/apache_beam/utils/dependency_test.py +++ /dev/null @@ -1,425 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -"""Unit tests for the setup module.""" - -import logging -import os -import shutil -import tempfile -import unittest - -from apache_beam import utils -from apache_beam.utils import dependency -from apache_beam.utils import names -from apache_beam.utils.pipeline_options import GoogleCloudOptions -from apache_beam.utils.pipeline_options import PipelineOptions -from apache_beam.utils.pipeline_options import SetupOptions - - -class SetupTest(unittest.TestCase): - - def update_options(self, options): - setup_options = options.view_as(SetupOptions) - setup_options.sdk_location = '' - google_cloud_options = options.view_as(GoogleCloudOptions) - if google_cloud_options.temp_location is None: - google_cloud_options.temp_location = google_cloud_options.staging_location - - def create_temp_file(self, path, contents): - with open(path, 'w') as f: - f.write(contents) - return f.name - - def populate_requirements_cache(self, requirements_file, cache_dir): - _ = requirements_file - self.create_temp_file(os.path.join(cache_dir, 'abc.txt'), 'nothing') - self.create_temp_file(os.path.join(cache_dir, 'def.txt'), 'nothing') - - def test_no_staging_location(self): - with self.assertRaises(RuntimeError) as cm: - dependency.stage_job_resources(PipelineOptions()) - self.assertEqual('The --staging_location option must be specified.', - cm.exception.message) - - def test_no_temp_location(self): - staging_dir = tempfile.mkdtemp() - options = PipelineOptions() - google_cloud_options = options.view_as(GoogleCloudOptions) - google_cloud_options.staging_location = staging_dir - self.update_options(options) - google_cloud_options.temp_location = None - with self.assertRaises(RuntimeError) as cm: - dependency.stage_job_resources(options) - self.assertEqual('The --temp_location option must be specified.', - cm.exception.message) - - def test_no_main_session(self): - staging_dir = tempfile.mkdtemp() - options = PipelineOptions() - - options.view_as(GoogleCloudOptions).staging_location = staging_dir - options.view_as(SetupOptions).save_main_session = False - self.update_options(options) - - self.assertEqual( - [], - dependency.stage_job_resources(options)) - - def test_with_main_session(self): - staging_dir = tempfile.mkdtemp() - options = PipelineOptions() - - options.view_as(GoogleCloudOptions).staging_location = staging_dir - options.view_as(SetupOptions).save_main_session = True - self.update_options(options) - - self.assertEqual( - [names.PICKLED_MAIN_SESSION_FILE], - dependency.stage_job_resources(options)) - self.assertTrue( - os.path.isfile( - os.path.join(staging_dir, names.PICKLED_MAIN_SESSION_FILE))) - - def test_default_resources(self): - staging_dir = tempfile.mkdtemp() - options = PipelineOptions() - options.view_as(GoogleCloudOptions).staging_location = staging_dir - self.update_options(options) - - self.assertEqual( - [], - dependency.stage_job_resources(options)) - - def test_with_requirements_file(self): - try: - staging_dir = tempfile.mkdtemp() - requirements_cache_dir = tempfile.mkdtemp() - source_dir = tempfile.mkdtemp() - - options = PipelineOptions() - options.view_as(GoogleCloudOptions).staging_location = staging_dir - self.update_options(options) - options.view_as(SetupOptions).requirements_cache = requirements_cache_dir - options.view_as(SetupOptions).requirements_file = os.path.join( - source_dir, dependency.REQUIREMENTS_FILE) - self.create_temp_file( - os.path.join(source_dir, dependency.REQUIREMENTS_FILE), 'nothing') - self.assertEqual( - sorted([dependency.REQUIREMENTS_FILE, - 'abc.txt', 'def.txt']), - sorted(dependency.stage_job_resources( - options, - populate_requirements_cache=self.populate_requirements_cache))) - self.assertTrue( - os.path.isfile( - os.path.join(staging_dir, dependency.REQUIREMENTS_FILE))) - self.assertTrue(os.path.isfile(os.path.join(staging_dir, 'abc.txt'))) - self.assertTrue(os.path.isfile(os.path.join(staging_dir, 'def.txt'))) - finally: - shutil.rmtree(staging_dir) - shutil.rmtree(requirements_cache_dir) - shutil.rmtree(source_dir) - - def test_requirements_file_not_present(self): - staging_dir = tempfile.mkdtemp() - with self.assertRaises(RuntimeError) as cm: - options = PipelineOptions() - options.view_as(GoogleCloudOptions).staging_location = staging_dir - self.update_options(options) - options.view_as(SetupOptions).requirements_file = 'nosuchfile' - dependency.stage_job_resources( - options, populate_requirements_cache=self.populate_requirements_cache) - self.assertEqual( - cm.exception.message, - 'The file %s cannot be found. It was specified in the ' - '--requirements_file command line option.' % 'nosuchfile') - - def test_with_requirements_file_and_cache(self): - staging_dir = tempfile.mkdtemp() - source_dir = tempfile.mkdtemp() - - options = PipelineOptions() - options.view_as(GoogleCloudOptions).staging_location = staging_dir - self.update_options(options) - options.view_as(SetupOptions).requirements_file = os.path.join( - source_dir, dependency.REQUIREMENTS_FILE) - options.view_as(SetupOptions).requirements_cache = os.path.join( - tempfile.gettempdir(), 'alternative-cache-dir') - self.create_temp_file( - os.path.join(source_dir, dependency.REQUIREMENTS_FILE), 'nothing') - self.assertEqual( - sorted([dependency.REQUIREMENTS_FILE, - 'abc.txt', 'def.txt']), - sorted(dependency.stage_job_resources( - options, - populate_requirements_cache=self.populate_requirements_cache))) - self.assertTrue( - os.path.isfile( - os.path.join(staging_dir, dependency.REQUIREMENTS_FILE))) - self.assertTrue(os.path.isfile(os.path.join(staging_dir, 'abc.txt'))) - self.assertTrue(os.path.isfile(os.path.join(staging_dir, 'def.txt'))) - - def test_with_setup_file(self): - staging_dir = tempfile.mkdtemp() - source_dir = tempfile.mkdtemp() - self.create_temp_file( - os.path.join(source_dir, 'setup.py'), 'notused') - - options = PipelineOptions() - options.view_as(GoogleCloudOptions).staging_location = staging_dir - self.update_options(options) - options.view_as(SetupOptions).setup_file = os.path.join( - source_dir, 'setup.py') - - self.assertEqual( - [dependency.WORKFLOW_TARBALL_FILE], - dependency.stage_job_resources( - options, - # We replace the build setup command because a realistic one would - # require the setuptools package to be installed. Note that we can't - # use "touch" here to create the expected output tarball file, since - # touch is not available on Windows, so we invoke python to produce - # equivalent behavior. - build_setup_args=[ - 'python', '-c', 'open(__import__("sys").argv[1], "a")', - os.path.join(source_dir, dependency.WORKFLOW_TARBALL_FILE)], - temp_dir=source_dir)) - self.assertTrue( - os.path.isfile( - os.path.join(staging_dir, dependency.WORKFLOW_TARBALL_FILE))) - - def test_setup_file_not_present(self): - staging_dir = tempfile.mkdtemp() - - options = PipelineOptions() - options.view_as(GoogleCloudOptions).staging_location = staging_dir - self.update_options(options) - options.view_as(SetupOptions).setup_file = 'nosuchfile' - - with self.assertRaises(RuntimeError) as cm: - dependency.stage_job_resources(options) - self.assertEqual( - cm.exception.message, - 'The file %s cannot be found. It was specified in the ' - '--setup_file command line option.' % 'nosuchfile') - - def test_setup_file_not_named_setup_dot_py(self): - staging_dir = tempfile.mkdtemp() - source_dir = tempfile.mkdtemp() - - options = PipelineOptions() - options.view_as(GoogleCloudOptions).staging_location = staging_dir - self.update_options(options) - options.view_as(SetupOptions).setup_file = ( - os.path.join(source_dir, 'xyz-setup.py')) - - self.create_temp_file( - os.path.join(source_dir, 'xyz-setup.py'), 'notused') - with self.assertRaises(RuntimeError) as cm: - dependency.stage_job_resources(options) - self.assertTrue( - cm.exception.message.startswith( - 'The --setup_file option expects the full path to a file named ' - 'setup.py instead of ')) - - def override_file_copy(self, expected_from_path, expected_to_dir): - def file_copy(from_path, to_path): - if not from_path.endswith(names.PICKLED_MAIN_SESSION_FILE): - self.assertEqual(expected_from_path, from_path) - self.assertEqual(utils.path.join(expected_to_dir, - names.DATAFLOW_SDK_TARBALL_FILE), - to_path) - if from_path.startswith('gs://') or to_path.startswith('gs://'): - logging.info('Faking file_copy(%s, %s)', from_path, to_path) - else: - shutil.copyfile(from_path, to_path) - dependency._dependency_file_copy = file_copy - - def override_file_download(self, expected_from_url, expected_to_folder): - def file_download(from_url, _): - self.assertEqual(expected_from_url, from_url) - tarball_path = os.path.join(expected_to_folder, 'sdk-tarball') - with open(tarball_path, 'w') as f: - f.write('Some contents.') - return tarball_path - dependency._dependency_file_download = file_download - return os.path.join(expected_to_folder, 'sdk-tarball') - - def override_pypi_download(self, expected_from_url, expected_to_folder): - def pypi_download(_): - tarball_path = os.path.join(expected_to_folder, 'sdk-tarball') - with open(tarball_path, 'w') as f: - f.write('Some contents.') - return tarball_path - dependency._download_pypi_sdk_package = pypi_download - return os.path.join(expected_to_folder, 'sdk-tarball') - - def test_sdk_location_default(self): - staging_dir = tempfile.mkdtemp() - expected_from_url = 'pypi' - expected_from_path = self.override_pypi_download( - expected_from_url, staging_dir) - self.override_file_copy(expected_from_path, staging_dir) - - options = PipelineOptions() - options.view_as(GoogleCloudOptions).staging_location = staging_dir - self.update_options(options) - options.view_as(SetupOptions).sdk_location = 'default' - - self.assertEqual( - [names.DATAFLOW_SDK_TARBALL_FILE], - dependency.stage_job_resources( - options, - file_copy=dependency._dependency_file_copy)) - - def test_sdk_location_local(self): - staging_dir = tempfile.mkdtemp() - sdk_location = tempfile.mkdtemp() - self.create_temp_file( - os.path.join( - sdk_location, - names.DATAFLOW_SDK_TARBALL_FILE), - 'contents') - - options = PipelineOptions() - options.view_as(GoogleCloudOptions).staging_location = staging_dir - self.update_options(options) - options.view_as(SetupOptions).sdk_location = sdk_location - - self.assertEqual( - [names.DATAFLOW_SDK_TARBALL_FILE], - dependency.stage_job_resources(options)) - tarball_path = os.path.join( - staging_dir, names.DATAFLOW_SDK_TARBALL_FILE) - with open(tarball_path) as f: - self.assertEqual(f.read(), 'contents') - - def test_sdk_location_local_not_present(self): - staging_dir = tempfile.mkdtemp() - sdk_location = 'nosuchdir' - with self.assertRaises(RuntimeError) as cm: - options = PipelineOptions() - options.view_as(GoogleCloudOptions).staging_location = staging_dir - self.update_options(options) - options.view_as(SetupOptions).sdk_location = sdk_location - - dependency.stage_job_resources(options) - self.assertEqual( - 'The file "%s" cannot be found. Its ' - 'location was specified by the --sdk_location command-line option.' % - sdk_location, - cm.exception.message) - - def test_sdk_location_gcs(self): - staging_dir = tempfile.mkdtemp() - sdk_location = 'gs://my-gcs-bucket/tarball.tar.gz' - self.override_file_copy(sdk_location, staging_dir) - - options = PipelineOptions() - options.view_as(GoogleCloudOptions).staging_location = staging_dir - self.update_options(options) - options.view_as(SetupOptions).sdk_location = sdk_location - - self.assertEqual( - [names.DATAFLOW_SDK_TARBALL_FILE], - dependency.stage_job_resources(options)) - - def test_with_extra_packages(self): - staging_dir = tempfile.mkdtemp() - source_dir = tempfile.mkdtemp() - self.create_temp_file( - os.path.join(source_dir, 'abc.tar.gz'), 'nothing') - self.create_temp_file( - os.path.join(source_dir, 'xyz.tar.gz'), 'nothing') - self.create_temp_file( - os.path.join(source_dir, 'xyz2.tar'), 'nothing') - self.create_temp_file( - os.path.join(source_dir, 'whl.whl'), 'nothing') - self.create_temp_file( - os.path.join(source_dir, dependency.EXTRA_PACKAGES_FILE), 'nothing') - - options = PipelineOptions() - options.view_as(GoogleCloudOptions).staging_location = staging_dir - self.update_options(options) - options.view_as(SetupOptions).extra_packages = [ - os.path.join(source_dir, 'abc.tar.gz'), - os.path.join(source_dir, 'xyz.tar.gz'), - os.path.join(source_dir, 'xyz2.tar'), - os.path.join(source_dir, 'whl.whl'), - 'gs://my-gcs-bucket/gcs.tar.gz'] - - gcs_copied_files = [] - - def file_copy(from_path, to_path): - if from_path.startswith('gs://'): - gcs_copied_files.append(from_path) - _, from_name = os.path.split(from_path) - self.create_temp_file(os.path.join(to_path, from_name), 'nothing') - logging.info('Fake copied GCS file: %s to %s', from_path, to_path) - elif to_path.startswith('gs://'): - logging.info('Faking file_copy(%s, %s)', from_path, to_path) - else: - shutil.copyfile(from_path, to_path) - - dependency._dependency_file_copy = file_copy - - self.assertEqual( - ['abc.tar.gz', 'xyz.tar.gz', 'xyz2.tar', 'whl.whl', 'gcs.tar.gz', - dependency.EXTRA_PACKAGES_FILE], - dependency.stage_job_resources(options)) - with open(os.path.join(staging_dir, dependency.EXTRA_PACKAGES_FILE)) as f: - self.assertEqual(['abc.tar.gz\n', 'xyz.tar.gz\n', 'xyz2.tar\n', - 'whl.whl\n', 'gcs.tar.gz\n'], f.readlines()) - self.assertEqual(['gs://my-gcs-bucket/gcs.tar.gz'], gcs_copied_files) - - def test_with_extra_packages_missing_files(self): - staging_dir = tempfile.mkdtemp() - with self.assertRaises(RuntimeError) as cm: - - options = PipelineOptions() - options.view_as(GoogleCloudOptions).staging_location = staging_dir - self.update_options(options) - options.view_as(SetupOptions).extra_packages = ['nosuchfile.tar.gz'] - - dependency.stage_job_resources(options) - self.assertEqual( - cm.exception.message, - 'The file %s cannot be found. It was specified in the ' - '--extra_packages command line option.' % 'nosuchfile.tar.gz') - - def test_with_extra_packages_invalid_file_name(self): - staging_dir = tempfile.mkdtemp() - source_dir = tempfile.mkdtemp() - self.create_temp_file( - os.path.join(source_dir, 'abc.tgz'), 'nothing') - with self.assertRaises(RuntimeError) as cm: - options = PipelineOptions() - options.view_as(GoogleCloudOptions).staging_location = staging_dir - self.update_options(options) - options.view_as(SetupOptions).extra_packages = [ - os.path.join(source_dir, 'abc.tgz')] - dependency.stage_job_resources(options) - self.assertEqual( - cm.exception.message, - 'The --extra_package option expects a full path ending with ".tar" or ' - '".tar.gz" instead of %s' % os.path.join(source_dir, 'abc.tgz')) - - -if __name__ == '__main__': - logging.getLogger().setLevel(logging.INFO) - unittest.main() http://git-wip-us.apache.org/repos/asf/beam/blob/821669da/sdks/python/apache_beam/utils/profiler.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/utils/profiler.py b/sdks/python/apache_beam/utils/profiler.py index 2a2df17..852b659 100644 --- a/sdks/python/apache_beam/utils/profiler.py +++ b/sdks/python/apache_beam/utils/profiler.py @@ -24,10 +24,8 @@ import pstats import StringIO import tempfile import time -from threading import Timer import warnings - -from apache_beam.utils.dependency import _dependency_file_copy +from threading import Timer class Profile(object): @@ -35,11 +33,13 @@ class Profile(object): SORTBY = 'cumulative' - def __init__(self, profile_id, profile_location=None, log_results=False): + def __init__(self, profile_id, profile_location=None, log_results=False, + file_copy_fn=None): self.stats = None self.profile_id = str(profile_id) self.profile_location = profile_location self.log_results = log_results + self.file_copy_fn = file_copy_fn def __enter__(self): logging.info('Start profiling: %s', self.profile_id) @@ -51,14 +51,14 @@ class Profile(object): self.profile.disable() logging.info('Stop profiling: %s', self.profile_id) - if self.profile_location: + if self.profile_location and self.file_copy_fn: dump_location = os.path.join( self.profile_location, 'profile', ('%s-%s' % (time.strftime('%Y-%m-%d_%H_%M_%S'), self.profile_id))) fd, filename = tempfile.mkstemp() self.profile.dump_stats(filename) logging.info('Copying profiler data to: [%s]', dump_location) - _dependency_file_copy(filename, dump_location) # pylint: disable=protected-access + self.file_copy_fn(filename, dump_location) # pylint: disable=protected-access os.close(fd) os.remove(filename)