Repository: beam
Updated Branches:
  refs/heads/master 8ec890909 -> 00b395881


Move dependency file to dataflow runner directory


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/821669da
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/821669da
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/821669da

Branch: refs/heads/master
Commit: 821669da21d2d785977428e1ecb41c641dda9852
Parents: 8ec8909
Author: Ahmet Altay <al...@google.com>
Authored: Wed Feb 22 19:08:16 2017 -0800
Committer: Ahmet Altay <al...@google.com>
Committed: Wed Feb 22 19:36:04 2017 -0800

----------------------------------------------------------------------
 .../runners/dataflow/internal/apiclient.py      |   6 +-
 .../runners/dataflow/internal/dependency.py     | 508 +++++++++++++++++++
 .../dataflow/internal/dependency_test.py        | 425 ++++++++++++++++
 sdks/python/apache_beam/utils/dependency.py     | 504 ------------------
 .../python/apache_beam/utils/dependency_test.py | 425 ----------------
 sdks/python/apache_beam/utils/profiler.py       |  12 +-
 6 files changed, 942 insertions(+), 938 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/821669da/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py 
b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
index 0dab676..481ab70 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
@@ -34,13 +34,13 @@ from apache_beam import utils
 from apache_beam.internal.auth import get_service_credentials
 from apache_beam.internal.gcp.json_value import to_json_value
 from apache_beam.io.gcp.internal.clients import storage
+from apache_beam.runners.dataflow.internal import dependency
 from apache_beam.runners.dataflow.internal.clients import dataflow
+from apache_beam.runners.dataflow.internal.dependency import 
get_required_container_version
+from apache_beam.runners.dataflow.internal.dependency import 
get_sdk_name_and_version
 from apache_beam.transforms import cy_combiners
 from apache_beam.transforms.display import DisplayData
-from apache_beam.utils import dependency
 from apache_beam.utils import retry
-from apache_beam.utils.dependency import get_required_container_version
-from apache_beam.utils.dependency import get_sdk_name_and_version
 from apache_beam.utils.names import PropertyNames
 from apache_beam.utils.pipeline_options import DebugOptions
 from apache_beam.utils.pipeline_options import GoogleCloudOptions

http://git-wip-us.apache.org/repos/asf/beam/blob/821669da/sdks/python/apache_beam/runners/dataflow/internal/dependency.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/dependency.py 
b/sdks/python/apache_beam/runners/dataflow/internal/dependency.py
new file mode 100644
index 0000000..6024332
--- /dev/null
+++ b/sdks/python/apache_beam/runners/dataflow/internal/dependency.py
@@ -0,0 +1,508 @@
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Support for installing custom code and required dependencies.
+
+Workflows, with the exception of very simple ones, are organized in multiple
+modules and packages. Typically, these modules and packages have
+dependencies on other standard libraries. Dataflow relies on the Python
+setuptools package to handle these scenarios. For further details please read:
+https://pythonhosted.org/an_example_pypi_project/setuptools.html
+
+When a runner tries to run a pipeline it will check for a --requirements_file
+and a --setup_file option.
+
+If --setup_file is present then it is assumed that the folder containing the
+file specified by the option has the typical layout required by setuptools and
+it will run 'python setup.py sdist' to produce a source distribution. The
+resulting tarball (a .tar or .tar.gz file) will be staged at the GCS staging
+location specified as job option. When a worker starts it will check for the
+presence of this file and will run 'easy_install tarball' to install the
+package in the worker.
+
+If --requirements_file is present then the file specified by the option will be
+staged in the GCS staging location.  When a worker starts it will check for the
+presence of this file and will run 'pip install -r requirements.txt'. A
+requirements file can be easily generated by running 'pip freeze -r
+requirements.txt'. The reason a Dataflow runner does not run this automatically
+is because quite often only a small fraction of the dependencies present in a
+requirements.txt file are actually needed for remote execution and therefore a
+one-time manual trimming is desirable.
+
+TODO(silviuc): Staged files should have a job specific prefix.
+To prevent several jobs in the same project stomping on each other due to a
+shared staging location.
+
+TODO(silviuc): Should we allow several setup packages?
+TODO(silviuc): We should allow customizing the exact command for setup build.
+"""
+
+import glob
+import logging
+import os
+import re
+import shutil
+import sys
+import tempfile
+
+
+from apache_beam import utils
+from apache_beam import version as beam_version
+from apache_beam.internal import pickler
+from apache_beam.utils import names
+from apache_beam.utils import processes
+from apache_beam.utils.pipeline_options import GoogleCloudOptions
+from apache_beam.utils.pipeline_options import SetupOptions
+
+
+# Update this version to the next version whenever there is a change that will
+# require changes to the execution environment.
+BEAM_CONTAINER_VERSION = 'beamhead'
+
+# Standard file names used for staging files.
+WORKFLOW_TARBALL_FILE = 'workflow.tar.gz'
+REQUIREMENTS_FILE = 'requirements.txt'
+EXTRA_PACKAGES_FILE = 'extra_packages.txt'
+
+GOOGLE_PACKAGE_NAME = 'google-cloud-dataflow'
+
+
+def _dependency_file_copy(from_path, to_path):
+  """Copies a local file to a GCS file or vice versa."""
+  logging.info('file copy from %s to %s.', from_path, to_path)
+  if from_path.startswith('gs://') or to_path.startswith('gs://'):
+    command_args = ['gsutil', '-m', '-q', 'cp', from_path, to_path]
+    logging.info('Executing command: %s', command_args)
+    processes.check_call(command_args)
+  else:
+    # Branch used only for unit tests and integration tests.
+    # In such environments GCS support is not available.
+    if not os.path.isdir(os.path.dirname(to_path)):
+      logging.info('Created folder (since we have not done yet, and any errors 
'
+                   'will follow): %s ', os.path.dirname(to_path))
+      os.mkdir(os.path.dirname(to_path))
+    shutil.copyfile(from_path, to_path)
+
+
+def _dependency_file_download(from_url, to_folder):
+  """Downloads a file from a URL and returns path to the local file."""
+  # TODO(silviuc): We should cache downloads so we do not do it for every job.
+  try:
+    # We check if the file is actually there because wget returns a file
+    # even for a 404 response (file will contain the contents of the 404
+    # response).
+    response, content = __import__('httplib2').Http().request(from_url)
+    if int(response['status']) >= 400:
+      raise RuntimeError(
+          'Dataflow SDK not found at %s (response: %s)' % (from_url, response))
+    local_download_file = os.path.join(to_folder, 'dataflow-sdk.tar.gz')
+    with open(local_download_file, 'w') as f:
+      f.write(content)
+  except Exception:
+    logging.info('Failed to download SDK from %s', from_url)
+    raise
+  return local_download_file
+
+
+def _stage_extra_packages(extra_packages, staging_location, temp_dir,
+                          file_copy=_dependency_file_copy):
+  """Stages a list of local extra packages.
+
+  Args:
+    extra_packages: Ordered list of local paths to extra packages to be staged.
+    staging_location: Staging location for the packages.
+    temp_dir: Temporary folder where the resource building can happen. Caller
+      is responsible for cleaning up this folder after this function returns.
+    file_copy: Callable for copying files. The default version will copy from
+      a local file to a GCS location using the gsutil tool available in the
+      Google Cloud SDK package.
+
+  Returns:
+    A list of file names (no paths) for the resources staged. All the files
+    are assumed to be staged in staging_location.
+
+  Raises:
+    RuntimeError: If files specified are not found or do not have expected
+      name patterns.
+  """
+  resources = []
+  staging_temp_dir = None
+  local_packages = []
+  for package in extra_packages:
+    if not (os.path.basename(package).endswith('.tar') or
+            os.path.basename(package).endswith('.tar.gz') or
+            os.path.basename(package).endswith('.whl')):
+      raise RuntimeError(
+          'The --extra_package option expects a full path ending with '
+          '".tar" or ".tar.gz" instead of %s' % package)
+    if os.path.basename(package).endswith('.whl'):
+      logging.warning(
+          'The .whl package "%s" is provided in --extra_package. '
+          'This functionality is not officially supported. Since wheel '
+          'packages are binary distributions, this package must be '
+          'binary-compatible with the worker environment (e.g. Python 2.7 '
+          'running on an x64 Linux host).')
+
+    if not os.path.isfile(package):
+      if package.startswith('gs://'):
+        if not staging_temp_dir:
+          staging_temp_dir = tempfile.mkdtemp(dir=temp_dir)
+        logging.info('Downloading extra package: %s locally before staging',
+                     package)
+        _dependency_file_copy(package, staging_temp_dir)
+      else:
+        raise RuntimeError(
+            'The file %s cannot be found. It was specified in the '
+            '--extra_packages command line option.' % package)
+    else:
+      local_packages.append(package)
+
+  if staging_temp_dir:
+    local_packages.extend(
+        [utils.path.join(staging_temp_dir, f) for f in os.listdir(
+            staging_temp_dir)])
+
+  for package in local_packages:
+    basename = os.path.basename(package)
+    staged_path = utils.path.join(staging_location, basename)
+    file_copy(package, staged_path)
+    resources.append(basename)
+  # Create a file containing the list of extra packages and stage it.
+  # The file is important so that in the worker the packages are installed
+  # exactly in the order specified. This approach will avoid extra PyPI
+  # requests. For example if package A depends on package B and package A
+  # is installed first then the installer will try to satisfy the
+  # dependency on B by downloading the package from PyPI. If package B is
+  # installed first this is avoided.
+  with open(os.path.join(temp_dir, EXTRA_PACKAGES_FILE), 'wt') as f:
+    for package in local_packages:
+      f.write('%s\n' % os.path.basename(package))
+  staged_path = utils.path.join(staging_location, EXTRA_PACKAGES_FILE)
+  # Note that the caller of this function is responsible for deleting the
+  # temporary folder where all temp files are created, including this one.
+  file_copy(os.path.join(temp_dir, EXTRA_PACKAGES_FILE), staged_path)
+  resources.append(EXTRA_PACKAGES_FILE)
+
+  return resources
+
+
+def _get_python_executable():
+  # Allow overriding the python executable to use for downloading and
+  # installing dependencies, otherwise use the python executable for
+  # the current process.
+  python_bin = os.environ.get('BEAM_PYTHON') or sys.executable
+  if not python_bin:
+    raise ValueError('Could not find Python executable.')
+  return python_bin
+
+
+def _populate_requirements_cache(requirements_file, cache_dir):
+  # The 'pip download' command will not download again if it finds the
+  # tarball with the proper version already present.
+  # It will get the packages downloaded in the order they are presented in
+  # the requirements file and will not download package dependencies.
+  cmd_args = [
+      _get_python_executable(), '-m', 'pip', 'install', '--download', 
cache_dir,
+      '-r', requirements_file,
+      # Download from PyPI source distributions.
+      '--no-binary', ':all:']
+  logging.info('Executing command: %s', cmd_args)
+  processes.check_call(cmd_args)
+
+
+def stage_job_resources(
+    options, file_copy=_dependency_file_copy, build_setup_args=None,
+    temp_dir=None, populate_requirements_cache=_populate_requirements_cache):
+  """Creates (if needed) and stages job resources to options.staging_location.
+
+  Args:
+    options: Command line options. More specifically the function will expect
+      staging_location, requirements_file, setup_file, and save_main_session
+      options to be present.
+    file_copy: Callable for copying files. The default version will copy from
+      a local file to a GCS location using the gsutil tool available in the
+      Google Cloud SDK package.
+    build_setup_args: A list of command line arguments used to build a setup
+      package. Used only if options.setup_file is not None. Used only for
+      testing.
+    temp_dir: Temporary folder where the resource building can happen. If None
+      then a unique temp directory will be created. Used only for testing.
+    populate_requirements_cache: Callable for populating the requirements 
cache.
+      Used only for testing.
+
+  Returns:
+    A list of file names (no paths) for the resources staged. All the files
+    are assumed to be staged in options.staging_location.
+
+  Raises:
+    RuntimeError: If files specified are not found or error encountered while
+      trying to create the resources (e.g., build a setup package).
+  """
+  temp_dir = temp_dir or tempfile.mkdtemp()
+  resources = []
+
+  google_cloud_options = options.view_as(GoogleCloudOptions)
+  setup_options = options.view_as(SetupOptions)
+  # Make sure that all required options are specified. There are a few that 
have
+  # defaults to support local running scenarios.
+  if google_cloud_options.staging_location is None:
+    raise RuntimeError(
+        'The --staging_location option must be specified.')
+  if google_cloud_options.temp_location is None:
+    raise RuntimeError(
+        'The --temp_location option must be specified.')
+
+  # Stage a requirements file if present.
+  if setup_options.requirements_file is not None:
+    if not os.path.isfile(setup_options.requirements_file):
+      raise RuntimeError('The file %s cannot be found. It was specified in the 
'
+                         '--requirements_file command line option.' %
+                         setup_options.requirements_file)
+    staged_path = utils.path.join(google_cloud_options.staging_location,
+                                  REQUIREMENTS_FILE)
+    file_copy(setup_options.requirements_file, staged_path)
+    resources.append(REQUIREMENTS_FILE)
+    requirements_cache_path = (
+        os.path.join(tempfile.gettempdir(), 'dataflow-requirements-cache')
+        if setup_options.requirements_cache is None
+        else setup_options.requirements_cache)
+    # Populate cache with packages from requirements and stage the files
+    # in the cache.
+    if not os.path.exists(requirements_cache_path):
+      os.makedirs(requirements_cache_path)
+    populate_requirements_cache(
+        setup_options.requirements_file, requirements_cache_path)
+    for pkg in  glob.glob(os.path.join(requirements_cache_path, '*')):
+      file_copy(pkg, utils.path.join(google_cloud_options.staging_location,
+                                     os.path.basename(pkg)))
+      resources.append(os.path.basename(pkg))
+
+  # Handle a setup file if present.
+  # We will build the setup package locally and then copy it to the staging
+  # location because the staging location is a GCS path and the file cannot be
+  # created directly there.
+  if setup_options.setup_file is not None:
+    if not os.path.isfile(setup_options.setup_file):
+      raise RuntimeError('The file %s cannot be found. It was specified in the 
'
+                         '--setup_file command line option.' %
+                         setup_options.setup_file)
+    if os.path.basename(setup_options.setup_file) != 'setup.py':
+      raise RuntimeError(
+          'The --setup_file option expects the full path to a file named '
+          'setup.py instead of %s' % setup_options.setup_file)
+    tarball_file = _build_setup_package(setup_options.setup_file, temp_dir,
+                                        build_setup_args)
+    staged_path = utils.path.join(google_cloud_options.staging_location,
+                                  WORKFLOW_TARBALL_FILE)
+    file_copy(tarball_file, staged_path)
+    resources.append(WORKFLOW_TARBALL_FILE)
+
+  # Handle extra local packages that should be staged.
+  if setup_options.extra_packages is not None:
+    resources.extend(
+        _stage_extra_packages(setup_options.extra_packages,
+                              google_cloud_options.staging_location,
+                              temp_dir=temp_dir, file_copy=file_copy))
+
+  # Pickle the main session if requested.
+  # We will create the pickled main session locally and then copy it to the
+  # staging location because the staging location is a GCS path and the file
+  # cannot be created directly there.
+  if setup_options.save_main_session:
+    pickled_session_file = os.path.join(temp_dir,
+                                        names.PICKLED_MAIN_SESSION_FILE)
+    pickler.dump_session(pickled_session_file)
+    staged_path = utils.path.join(google_cloud_options.staging_location,
+                                  names.PICKLED_MAIN_SESSION_FILE)
+    file_copy(pickled_session_file, staged_path)
+    resources.append(names.PICKLED_MAIN_SESSION_FILE)
+
+  if hasattr(setup_options, 'sdk_location') and setup_options.sdk_location:
+    if setup_options.sdk_location == 'default':
+      stage_tarball_from_remote_location = True
+    elif (setup_options.sdk_location.startswith('gs://') or
+          setup_options.sdk_location.startswith('http://') or
+          setup_options.sdk_location.startswith('https://')):
+      stage_tarball_from_remote_location = True
+    else:
+      stage_tarball_from_remote_location = False
+
+    staged_path = utils.path.join(google_cloud_options.staging_location,
+                                  names.DATAFLOW_SDK_TARBALL_FILE)
+    if stage_tarball_from_remote_location:
+      # If --sdk_location is not specified then the appropriate package
+      # will be obtained from PyPI (https://pypi.python.org) based on the
+      # version of the currently running SDK. If the option is
+      # present then no version matching is made and the exact URL or path
+      # is expected.
+      #
+      # Unit tests running in the 'python setup.py test' context will
+      # not have the sdk_location attribute present and therefore we
+      # will not stage a tarball.
+      if setup_options.sdk_location == 'default':
+        sdk_remote_location = 'pypi'
+      else:
+        sdk_remote_location = setup_options.sdk_location
+      _stage_dataflow_sdk_tarball(sdk_remote_location, staged_path, temp_dir)
+      resources.append(names.DATAFLOW_SDK_TARBALL_FILE)
+    else:
+      # Check if we have a local Dataflow SDK tarball present. This branch is
+      # used by tests running with the SDK built at head.
+      if setup_options.sdk_location == 'default':
+        module_path = os.path.abspath(__file__)
+        sdk_path = os.path.join(
+            os.path.dirname(module_path), '..', 
names.DATAFLOW_SDK_TARBALL_FILE)
+      elif os.path.isdir(setup_options.sdk_location):
+        sdk_path = os.path.join(
+            setup_options.sdk_location, names.DATAFLOW_SDK_TARBALL_FILE)
+      else:
+        sdk_path = setup_options.sdk_location
+      if os.path.isfile(sdk_path):
+        logging.info('Copying dataflow SDK "%s" to staging location.', 
sdk_path)
+        file_copy(sdk_path, staged_path)
+        resources.append(names.DATAFLOW_SDK_TARBALL_FILE)
+      else:
+        if setup_options.sdk_location == 'default':
+          raise RuntimeError('Cannot find default Dataflow SDK tar file "%s"',
+                             sdk_path)
+        else:
+          raise RuntimeError(
+              'The file "%s" cannot be found. Its location was specified by '
+              'the --sdk_location command-line option.' %
+              sdk_path)
+
+  # Delete all temp files created while staging job resources.
+  shutil.rmtree(temp_dir)
+  return resources
+
+
+def _build_setup_package(setup_file, temp_dir, build_setup_args=None):
+  saved_current_directory = os.getcwd()
+  try:
+    os.chdir(os.path.dirname(setup_file))
+    if build_setup_args is None:
+      build_setup_args = [
+          _get_python_executable(), os.path.basename(setup_file),
+          'sdist', '--dist-dir', temp_dir]
+    logging.info('Executing command: %s', build_setup_args)
+    processes.check_call(build_setup_args)
+    output_files = glob.glob(os.path.join(temp_dir, '*.tar.gz'))
+    if not output_files:
+      raise RuntimeError(
+          'File %s not found.' % os.path.join(temp_dir, '*.tar.gz'))
+    return output_files[0]
+  finally:
+    os.chdir(saved_current_directory)
+
+
+def _stage_dataflow_sdk_tarball(sdk_remote_location, staged_path, temp_dir):
+  """Stage a Dataflow SDK tarball with the appropriate version.
+
+  Args:
+    sdk_remote_location: A GCS path to a Dataflow SDK tarball or a URL from
+      the file can be downloaded.
+    staged_path: GCS path where the found SDK tarball should be copied.
+    temp_dir: path to temporary location where the file should be downloaded.
+
+  Raises:
+    RuntimeError: If wget on the URL specified returs errors or the file
+      cannot be copied from/to GCS.
+  """
+  if (sdk_remote_location.startswith('http://') or
+      sdk_remote_location.startswith('https://')):
+    logging.info(
+        'Staging Dataflow SDK tarball from %s to %s',
+        sdk_remote_location, staged_path)
+    local_download_file = _dependency_file_download(
+        sdk_remote_location, temp_dir)
+    _dependency_file_copy(local_download_file, staged_path)
+  elif sdk_remote_location.startswith('gs://'):
+    # Stage the file to the GCS staging area.
+    logging.info(
+        'Staging Dataflow SDK tarball from %s to %s',
+        sdk_remote_location, staged_path)
+    _dependency_file_copy(sdk_remote_location, staged_path)
+  elif sdk_remote_location == 'pypi':
+    logging.info('Staging the SDK tarball from PyPI to %s', staged_path)
+    _dependency_file_copy(_download_pypi_sdk_package(temp_dir), staged_path)
+  else:
+    raise RuntimeError(
+        'The --sdk_location option was used with an unsupported '
+        'type of location: %s' % sdk_remote_location)
+
+
+def get_required_container_version():
+  """Returns the Google Cloud Dataflow container version for remote execution.
+  """
+  # TODO(silviuc): Handle apache-beam versions when we have official releases.
+  import pkg_resources as pkg
+  try:
+    version = pkg.get_distribution(GOOGLE_PACKAGE_NAME).version
+    # We drop any pre/post parts of the version and we keep only the X.Y.Z
+    # format.  For instance the 0.3.0rc2 SDK version translates into 0.3.0.
+    container_version = '%s.%s.%s' % 
pkg.parse_version(version)._version.release
+    # We do, however, keep the ".dev" suffix if it is present.
+    if re.match(r'.*\.dev[0-9]*$', version):
+      container_version += '.dev'
+    return container_version
+  except pkg.DistributionNotFound:
+    # This case covers Apache Beam end-to-end testing scenarios. All these 
tests
+    # will run with a special container version.
+    return BEAM_CONTAINER_VERSION
+
+
+def get_sdk_name_and_version():
+  """Returns name and version of SDK reported to Google Cloud Dataflow."""
+  # TODO(ccy): Make this check cleaner.
+  container_version = get_required_container_version()
+  if container_version == BEAM_CONTAINER_VERSION:
+    return ('Apache Beam SDK for Python', beam_version.__version__)
+  else:
+    return ('Google Cloud Dataflow SDK for Python', container_version)
+
+
+def _download_pypi_sdk_package(temp_dir):
+  """Downloads SDK package from PyPI and returns path to local path."""
+  # TODO(silviuc): Handle apache-beam versions when we have official releases.
+  import pkg_resources as pkg
+  try:
+    version = pkg.get_distribution(GOOGLE_PACKAGE_NAME).version
+  except pkg.DistributionNotFound:
+    raise RuntimeError('Please set --sdk_location command-line option '
+                       'or install a valid {} distribution.'
+                       .format(GOOGLE_PACKAGE_NAME))
+
+  # Get a source distribution for the SDK package from PyPI.
+  cmd_args = [
+      _get_python_executable(), '-m', 'pip', 'install', '--download', temp_dir,
+      '%s==%s' % (GOOGLE_PACKAGE_NAME, version),
+      '--no-binary', ':all:', '--no-deps']
+  logging.info('Executing command: %s', cmd_args)
+  processes.check_call(cmd_args)
+  zip_expected = os.path.join(
+      temp_dir, '%s-%s.zip' % (GOOGLE_PACKAGE_NAME, version))
+  if os.path.exists(zip_expected):
+    return zip_expected
+  tgz_expected = os.path.join(
+      temp_dir, '%s-%s.tar.gz' % (GOOGLE_PACKAGE_NAME, version))
+  if os.path.exists(tgz_expected):
+    return tgz_expected
+  raise RuntimeError(
+      'Failed to download a source distribution for the running SDK. Expected '
+      'either %s or %s to be found in the download folder.' % (
+          zip_expected, tgz_expected))

http://git-wip-us.apache.org/repos/asf/beam/blob/821669da/sdks/python/apache_beam/runners/dataflow/internal/dependency_test.py
----------------------------------------------------------------------
diff --git 
a/sdks/python/apache_beam/runners/dataflow/internal/dependency_test.py 
b/sdks/python/apache_beam/runners/dataflow/internal/dependency_test.py
new file mode 100644
index 0000000..0657a07
--- /dev/null
+++ b/sdks/python/apache_beam/runners/dataflow/internal/dependency_test.py
@@ -0,0 +1,425 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Unit tests for the setup module."""
+
+import logging
+import os
+import shutil
+import tempfile
+import unittest
+
+from apache_beam import utils
+from apache_beam.runners.dataflow.internal import dependency
+from apache_beam.utils import names
+from apache_beam.utils.pipeline_options import GoogleCloudOptions
+from apache_beam.utils.pipeline_options import PipelineOptions
+from apache_beam.utils.pipeline_options import SetupOptions
+
+
+class SetupTest(unittest.TestCase):
+
+  def update_options(self, options):
+    setup_options = options.view_as(SetupOptions)
+    setup_options.sdk_location = ''
+    google_cloud_options = options.view_as(GoogleCloudOptions)
+    if google_cloud_options.temp_location is None:
+      google_cloud_options.temp_location = 
google_cloud_options.staging_location
+
+  def create_temp_file(self, path, contents):
+    with open(path, 'w') as f:
+      f.write(contents)
+      return f.name
+
+  def populate_requirements_cache(self, requirements_file, cache_dir):
+    _ = requirements_file
+    self.create_temp_file(os.path.join(cache_dir, 'abc.txt'), 'nothing')
+    self.create_temp_file(os.path.join(cache_dir, 'def.txt'), 'nothing')
+
+  def test_no_staging_location(self):
+    with self.assertRaises(RuntimeError) as cm:
+      dependency.stage_job_resources(PipelineOptions())
+    self.assertEqual('The --staging_location option must be specified.',
+                     cm.exception.message)
+
+  def test_no_temp_location(self):
+    staging_dir = tempfile.mkdtemp()
+    options = PipelineOptions()
+    google_cloud_options = options.view_as(GoogleCloudOptions)
+    google_cloud_options.staging_location = staging_dir
+    self.update_options(options)
+    google_cloud_options.temp_location = None
+    with self.assertRaises(RuntimeError) as cm:
+      dependency.stage_job_resources(options)
+    self.assertEqual('The --temp_location option must be specified.',
+                     cm.exception.message)
+
+  def test_no_main_session(self):
+    staging_dir = tempfile.mkdtemp()
+    options = PipelineOptions()
+
+    options.view_as(GoogleCloudOptions).staging_location = staging_dir
+    options.view_as(SetupOptions).save_main_session = False
+    self.update_options(options)
+
+    self.assertEqual(
+        [],
+        dependency.stage_job_resources(options))
+
+  def test_with_main_session(self):
+    staging_dir = tempfile.mkdtemp()
+    options = PipelineOptions()
+
+    options.view_as(GoogleCloudOptions).staging_location = staging_dir
+    options.view_as(SetupOptions).save_main_session = True
+    self.update_options(options)
+
+    self.assertEqual(
+        [names.PICKLED_MAIN_SESSION_FILE],
+        dependency.stage_job_resources(options))
+    self.assertTrue(
+        os.path.isfile(
+            os.path.join(staging_dir, names.PICKLED_MAIN_SESSION_FILE)))
+
+  def test_default_resources(self):
+    staging_dir = tempfile.mkdtemp()
+    options = PipelineOptions()
+    options.view_as(GoogleCloudOptions).staging_location = staging_dir
+    self.update_options(options)
+
+    self.assertEqual(
+        [],
+        dependency.stage_job_resources(options))
+
+  def test_with_requirements_file(self):
+    try:
+      staging_dir = tempfile.mkdtemp()
+      requirements_cache_dir = tempfile.mkdtemp()
+      source_dir = tempfile.mkdtemp()
+
+      options = PipelineOptions()
+      options.view_as(GoogleCloudOptions).staging_location = staging_dir
+      self.update_options(options)
+      options.view_as(SetupOptions).requirements_cache = requirements_cache_dir
+      options.view_as(SetupOptions).requirements_file = os.path.join(
+          source_dir, dependency.REQUIREMENTS_FILE)
+      self.create_temp_file(
+          os.path.join(source_dir, dependency.REQUIREMENTS_FILE), 'nothing')
+      self.assertEqual(
+          sorted([dependency.REQUIREMENTS_FILE,
+                  'abc.txt', 'def.txt']),
+          sorted(dependency.stage_job_resources(
+              options,
+              populate_requirements_cache=self.populate_requirements_cache)))
+      self.assertTrue(
+          os.path.isfile(
+              os.path.join(staging_dir, dependency.REQUIREMENTS_FILE)))
+      self.assertTrue(os.path.isfile(os.path.join(staging_dir, 'abc.txt')))
+      self.assertTrue(os.path.isfile(os.path.join(staging_dir, 'def.txt')))
+    finally:
+      shutil.rmtree(staging_dir)
+      shutil.rmtree(requirements_cache_dir)
+      shutil.rmtree(source_dir)
+
+  def test_requirements_file_not_present(self):
+    staging_dir = tempfile.mkdtemp()
+    with self.assertRaises(RuntimeError) as cm:
+      options = PipelineOptions()
+      options.view_as(GoogleCloudOptions).staging_location = staging_dir
+      self.update_options(options)
+      options.view_as(SetupOptions).requirements_file = 'nosuchfile'
+      dependency.stage_job_resources(
+          options, 
populate_requirements_cache=self.populate_requirements_cache)
+    self.assertEqual(
+        cm.exception.message,
+        'The file %s cannot be found. It was specified in the '
+        '--requirements_file command line option.' % 'nosuchfile')
+
+  def test_with_requirements_file_and_cache(self):
+    staging_dir = tempfile.mkdtemp()
+    source_dir = tempfile.mkdtemp()
+
+    options = PipelineOptions()
+    options.view_as(GoogleCloudOptions).staging_location = staging_dir
+    self.update_options(options)
+    options.view_as(SetupOptions).requirements_file = os.path.join(
+        source_dir, dependency.REQUIREMENTS_FILE)
+    options.view_as(SetupOptions).requirements_cache = os.path.join(
+        tempfile.gettempdir(), 'alternative-cache-dir')
+    self.create_temp_file(
+        os.path.join(source_dir, dependency.REQUIREMENTS_FILE), 'nothing')
+    self.assertEqual(
+        sorted([dependency.REQUIREMENTS_FILE,
+                'abc.txt', 'def.txt']),
+        sorted(dependency.stage_job_resources(
+            options,
+            populate_requirements_cache=self.populate_requirements_cache)))
+    self.assertTrue(
+        os.path.isfile(
+            os.path.join(staging_dir, dependency.REQUIREMENTS_FILE)))
+    self.assertTrue(os.path.isfile(os.path.join(staging_dir, 'abc.txt')))
+    self.assertTrue(os.path.isfile(os.path.join(staging_dir, 'def.txt')))
+
+  def test_with_setup_file(self):
+    staging_dir = tempfile.mkdtemp()
+    source_dir = tempfile.mkdtemp()
+    self.create_temp_file(
+        os.path.join(source_dir, 'setup.py'), 'notused')
+
+    options = PipelineOptions()
+    options.view_as(GoogleCloudOptions).staging_location = staging_dir
+    self.update_options(options)
+    options.view_as(SetupOptions).setup_file = os.path.join(
+        source_dir, 'setup.py')
+
+    self.assertEqual(
+        [dependency.WORKFLOW_TARBALL_FILE],
+        dependency.stage_job_resources(
+            options,
+            # We replace the build setup command because a realistic one would
+            # require the setuptools package to be installed. Note that we 
can't
+            # use "touch" here to create the expected output tarball file, 
since
+            # touch is not available on Windows, so we invoke python to produce
+            # equivalent behavior.
+            build_setup_args=[
+                'python', '-c', 'open(__import__("sys").argv[1], "a")',
+                os.path.join(source_dir, dependency.WORKFLOW_TARBALL_FILE)],
+            temp_dir=source_dir))
+    self.assertTrue(
+        os.path.isfile(
+            os.path.join(staging_dir, dependency.WORKFLOW_TARBALL_FILE)))
+
+  def test_setup_file_not_present(self):
+    staging_dir = tempfile.mkdtemp()
+
+    options = PipelineOptions()
+    options.view_as(GoogleCloudOptions).staging_location = staging_dir
+    self.update_options(options)
+    options.view_as(SetupOptions).setup_file = 'nosuchfile'
+
+    with self.assertRaises(RuntimeError) as cm:
+      dependency.stage_job_resources(options)
+    self.assertEqual(
+        cm.exception.message,
+        'The file %s cannot be found. It was specified in the '
+        '--setup_file command line option.' % 'nosuchfile')
+
+  def test_setup_file_not_named_setup_dot_py(self):
+    staging_dir = tempfile.mkdtemp()
+    source_dir = tempfile.mkdtemp()
+
+    options = PipelineOptions()
+    options.view_as(GoogleCloudOptions).staging_location = staging_dir
+    self.update_options(options)
+    options.view_as(SetupOptions).setup_file = (
+        os.path.join(source_dir, 'xyz-setup.py'))
+
+    self.create_temp_file(
+        os.path.join(source_dir, 'xyz-setup.py'), 'notused')
+    with self.assertRaises(RuntimeError) as cm:
+      dependency.stage_job_resources(options)
+    self.assertTrue(
+        cm.exception.message.startswith(
+            'The --setup_file option expects the full path to a file named '
+            'setup.py instead of '))
+
+  def override_file_copy(self, expected_from_path, expected_to_dir):
+    def file_copy(from_path, to_path):
+      if not from_path.endswith(names.PICKLED_MAIN_SESSION_FILE):
+        self.assertEqual(expected_from_path, from_path)
+        self.assertEqual(utils.path.join(expected_to_dir,
+                                         names.DATAFLOW_SDK_TARBALL_FILE),
+                         to_path)
+      if from_path.startswith('gs://') or to_path.startswith('gs://'):
+        logging.info('Faking file_copy(%s, %s)', from_path, to_path)
+      else:
+        shutil.copyfile(from_path, to_path)
+    dependency._dependency_file_copy = file_copy
+
+  def override_file_download(self, expected_from_url, expected_to_folder):
+    def file_download(from_url, _):
+      self.assertEqual(expected_from_url, from_url)
+      tarball_path = os.path.join(expected_to_folder, 'sdk-tarball')
+      with open(tarball_path, 'w') as f:
+        f.write('Some contents.')
+      return tarball_path
+    dependency._dependency_file_download = file_download
+    return os.path.join(expected_to_folder, 'sdk-tarball')
+
+  def override_pypi_download(self, expected_from_url, expected_to_folder):
+    def pypi_download(_):
+      tarball_path = os.path.join(expected_to_folder, 'sdk-tarball')
+      with open(tarball_path, 'w') as f:
+        f.write('Some contents.')
+      return tarball_path
+    dependency._download_pypi_sdk_package = pypi_download
+    return os.path.join(expected_to_folder, 'sdk-tarball')
+
+  def test_sdk_location_default(self):
+    staging_dir = tempfile.mkdtemp()
+    expected_from_url = 'pypi'
+    expected_from_path = self.override_pypi_download(
+        expected_from_url, staging_dir)
+    self.override_file_copy(expected_from_path, staging_dir)
+
+    options = PipelineOptions()
+    options.view_as(GoogleCloudOptions).staging_location = staging_dir
+    self.update_options(options)
+    options.view_as(SetupOptions).sdk_location = 'default'
+
+    self.assertEqual(
+        [names.DATAFLOW_SDK_TARBALL_FILE],
+        dependency.stage_job_resources(
+            options,
+            file_copy=dependency._dependency_file_copy))
+
+  def test_sdk_location_local(self):
+    staging_dir = tempfile.mkdtemp()
+    sdk_location = tempfile.mkdtemp()
+    self.create_temp_file(
+        os.path.join(
+            sdk_location,
+            names.DATAFLOW_SDK_TARBALL_FILE),
+        'contents')
+
+    options = PipelineOptions()
+    options.view_as(GoogleCloudOptions).staging_location = staging_dir
+    self.update_options(options)
+    options.view_as(SetupOptions).sdk_location = sdk_location
+
+    self.assertEqual(
+        [names.DATAFLOW_SDK_TARBALL_FILE],
+        dependency.stage_job_resources(options))
+    tarball_path = os.path.join(
+        staging_dir, names.DATAFLOW_SDK_TARBALL_FILE)
+    with open(tarball_path) as f:
+      self.assertEqual(f.read(), 'contents')
+
+  def test_sdk_location_local_not_present(self):
+    staging_dir = tempfile.mkdtemp()
+    sdk_location = 'nosuchdir'
+    with self.assertRaises(RuntimeError) as cm:
+      options = PipelineOptions()
+      options.view_as(GoogleCloudOptions).staging_location = staging_dir
+      self.update_options(options)
+      options.view_as(SetupOptions).sdk_location = sdk_location
+
+      dependency.stage_job_resources(options)
+    self.assertEqual(
+        'The file "%s" cannot be found. Its '
+        'location was specified by the --sdk_location command-line option.' %
+        sdk_location,
+        cm.exception.message)
+
+  def test_sdk_location_gcs(self):
+    staging_dir = tempfile.mkdtemp()
+    sdk_location = 'gs://my-gcs-bucket/tarball.tar.gz'
+    self.override_file_copy(sdk_location, staging_dir)
+
+    options = PipelineOptions()
+    options.view_as(GoogleCloudOptions).staging_location = staging_dir
+    self.update_options(options)
+    options.view_as(SetupOptions).sdk_location = sdk_location
+
+    self.assertEqual(
+        [names.DATAFLOW_SDK_TARBALL_FILE],
+        dependency.stage_job_resources(options))
+
+  def test_with_extra_packages(self):
+    staging_dir = tempfile.mkdtemp()
+    source_dir = tempfile.mkdtemp()
+    self.create_temp_file(
+        os.path.join(source_dir, 'abc.tar.gz'), 'nothing')
+    self.create_temp_file(
+        os.path.join(source_dir, 'xyz.tar.gz'), 'nothing')
+    self.create_temp_file(
+        os.path.join(source_dir, 'xyz2.tar'), 'nothing')
+    self.create_temp_file(
+        os.path.join(source_dir, 'whl.whl'), 'nothing')
+    self.create_temp_file(
+        os.path.join(source_dir, dependency.EXTRA_PACKAGES_FILE), 'nothing')
+
+    options = PipelineOptions()
+    options.view_as(GoogleCloudOptions).staging_location = staging_dir
+    self.update_options(options)
+    options.view_as(SetupOptions).extra_packages = [
+        os.path.join(source_dir, 'abc.tar.gz'),
+        os.path.join(source_dir, 'xyz.tar.gz'),
+        os.path.join(source_dir, 'xyz2.tar'),
+        os.path.join(source_dir, 'whl.whl'),
+        'gs://my-gcs-bucket/gcs.tar.gz']
+
+    gcs_copied_files = []
+
+    def file_copy(from_path, to_path):
+      if from_path.startswith('gs://'):
+        gcs_copied_files.append(from_path)
+        _, from_name = os.path.split(from_path)
+        self.create_temp_file(os.path.join(to_path, from_name), 'nothing')
+        logging.info('Fake copied GCS file: %s to %s', from_path, to_path)
+      elif to_path.startswith('gs://'):
+        logging.info('Faking file_copy(%s, %s)', from_path, to_path)
+      else:
+        shutil.copyfile(from_path, to_path)
+
+    dependency._dependency_file_copy = file_copy
+
+    self.assertEqual(
+        ['abc.tar.gz', 'xyz.tar.gz', 'xyz2.tar', 'whl.whl', 'gcs.tar.gz',
+         dependency.EXTRA_PACKAGES_FILE],
+        dependency.stage_job_resources(options))
+    with open(os.path.join(staging_dir, dependency.EXTRA_PACKAGES_FILE)) as f:
+      self.assertEqual(['abc.tar.gz\n', 'xyz.tar.gz\n', 'xyz2.tar\n',
+                        'whl.whl\n', 'gcs.tar.gz\n'], f.readlines())
+    self.assertEqual(['gs://my-gcs-bucket/gcs.tar.gz'], gcs_copied_files)
+
+  def test_with_extra_packages_missing_files(self):
+    staging_dir = tempfile.mkdtemp()
+    with self.assertRaises(RuntimeError) as cm:
+
+      options = PipelineOptions()
+      options.view_as(GoogleCloudOptions).staging_location = staging_dir
+      self.update_options(options)
+      options.view_as(SetupOptions).extra_packages = ['nosuchfile.tar.gz']
+
+      dependency.stage_job_resources(options)
+    self.assertEqual(
+        cm.exception.message,
+        'The file %s cannot be found. It was specified in the '
+        '--extra_packages command line option.' % 'nosuchfile.tar.gz')
+
+  def test_with_extra_packages_invalid_file_name(self):
+    staging_dir = tempfile.mkdtemp()
+    source_dir = tempfile.mkdtemp()
+    self.create_temp_file(
+        os.path.join(source_dir, 'abc.tgz'), 'nothing')
+    with self.assertRaises(RuntimeError) as cm:
+      options = PipelineOptions()
+      options.view_as(GoogleCloudOptions).staging_location = staging_dir
+      self.update_options(options)
+      options.view_as(SetupOptions).extra_packages = [
+          os.path.join(source_dir, 'abc.tgz')]
+      dependency.stage_job_resources(options)
+    self.assertEqual(
+        cm.exception.message,
+        'The --extra_package option expects a full path ending with ".tar" or '
+        '".tar.gz" instead of %s' % os.path.join(source_dir, 'abc.tgz'))
+
+
+if __name__ == '__main__':
+  logging.getLogger().setLevel(logging.INFO)
+  unittest.main()

http://git-wip-us.apache.org/repos/asf/beam/blob/821669da/sdks/python/apache_beam/utils/dependency.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/dependency.py 
b/sdks/python/apache_beam/utils/dependency.py
deleted file mode 100644
index 11a2e1c..0000000
--- a/sdks/python/apache_beam/utils/dependency.py
+++ /dev/null
@@ -1,504 +0,0 @@
-
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""Support for installing custom code and required dependencies.
-
-Workflows, with the exception of very simple ones, are organized in multiple
-modules and packages. Typically, these modules and packages have
-dependencies on other standard libraries. Dataflow relies on the Python
-setuptools package to handle these scenarios. For further details please read:
-https://pythonhosted.org/an_example_pypi_project/setuptools.html
-
-When a runner tries to run a pipeline it will check for a --requirements_file
-and a --setup_file option.
-
-If --setup_file is present then it is assumed that the folder containing the
-file specified by the option has the typical layout required by setuptools and
-it will run 'python setup.py sdist' to produce a source distribution. The
-resulting tarball (a .tar or .tar.gz file) will be staged at the GCS staging
-location specified as job option. When a worker starts it will check for the
-presence of this file and will run 'easy_install tarball' to install the
-package in the worker.
-
-If --requirements_file is present then the file specified by the option will be
-staged in the GCS staging location.  When a worker starts it will check for the
-presence of this file and will run 'pip install -r requirements.txt'. A
-requirements file can be easily generated by running 'pip freeze -r
-requirements.txt'. The reason a Dataflow runner does not run this automatically
-is because quite often only a small fraction of the dependencies present in a
-requirements.txt file are actually needed for remote execution and therefore a
-one-time manual trimming is desirable.
-
-TODO(silviuc): Staged files should have a job specific prefix.
-To prevent several jobs in the same project stomping on each other due to a
-shared staging location.
-
-TODO(silviuc): Should we allow several setup packages?
-TODO(silviuc): We should allow customizing the exact command for setup build.
-"""
-
-import glob
-import logging
-import os
-import re
-import shutil
-import sys
-import tempfile
-
-
-from apache_beam import utils
-from apache_beam import version as beam_version
-from apache_beam.internal import pickler
-from apache_beam.utils import names
-from apache_beam.utils import processes
-from apache_beam.utils.pipeline_options import GoogleCloudOptions
-from apache_beam.utils.pipeline_options import SetupOptions
-
-
-# Standard file names used for staging files.
-WORKFLOW_TARBALL_FILE = 'workflow.tar.gz'
-REQUIREMENTS_FILE = 'requirements.txt'
-EXTRA_PACKAGES_FILE = 'extra_packages.txt'
-
-GOOGLE_PACKAGE_NAME = 'google-cloud-dataflow'
-
-
-def _dependency_file_copy(from_path, to_path):
-  """Copies a local file to a GCS file or vice versa."""
-  logging.info('file copy from %s to %s.', from_path, to_path)
-  if from_path.startswith('gs://') or to_path.startswith('gs://'):
-    command_args = ['gsutil', '-m', '-q', 'cp', from_path, to_path]
-    logging.info('Executing command: %s', command_args)
-    processes.check_call(command_args)
-  else:
-    # Branch used only for unit tests and integration tests.
-    # In such environments GCS support is not available.
-    if not os.path.isdir(os.path.dirname(to_path)):
-      logging.info('Created folder (since we have not done yet, and any errors 
'
-                   'will follow): %s ', os.path.dirname(to_path))
-      os.mkdir(os.path.dirname(to_path))
-    shutil.copyfile(from_path, to_path)
-
-
-def _dependency_file_download(from_url, to_folder):
-  """Downloads a file from a URL and returns path to the local file."""
-  # TODO(silviuc): We should cache downloads so we do not do it for every job.
-  try:
-    # We check if the file is actually there because wget returns a file
-    # even for a 404 response (file will contain the contents of the 404
-    # response).
-    response, content = __import__('httplib2').Http().request(from_url)
-    if int(response['status']) >= 400:
-      raise RuntimeError(
-          'Dataflow SDK not found at %s (response: %s)' % (from_url, response))
-    local_download_file = os.path.join(to_folder, 'dataflow-sdk.tar.gz')
-    with open(local_download_file, 'w') as f:
-      f.write(content)
-  except Exception:
-    logging.info('Failed to download SDK from %s', from_url)
-    raise
-  return local_download_file
-
-
-def _stage_extra_packages(extra_packages, staging_location, temp_dir,
-                          file_copy=_dependency_file_copy):
-  """Stages a list of local extra packages.
-
-  Args:
-    extra_packages: Ordered list of local paths to extra packages to be staged.
-    staging_location: Staging location for the packages.
-    temp_dir: Temporary folder where the resource building can happen. Caller
-      is responsible for cleaning up this folder after this function returns.
-    file_copy: Callable for copying files. The default version will copy from
-      a local file to a GCS location using the gsutil tool available in the
-      Google Cloud SDK package.
-
-  Returns:
-    A list of file names (no paths) for the resources staged. All the files
-    are assumed to be staged in staging_location.
-
-  Raises:
-    RuntimeError: If files specified are not found or do not have expected
-      name patterns.
-  """
-  resources = []
-  staging_temp_dir = None
-  local_packages = []
-  for package in extra_packages:
-    if not (os.path.basename(package).endswith('.tar') or
-            os.path.basename(package).endswith('.tar.gz') or
-            os.path.basename(package).endswith('.whl')):
-      raise RuntimeError(
-          'The --extra_package option expects a full path ending with '
-          '".tar" or ".tar.gz" instead of %s' % package)
-    if os.path.basename(package).endswith('.whl'):
-      logging.warning(
-          'The .whl package "%s" is provided in --extra_package. '
-          'This functionality is not officially supported. Since wheel '
-          'packages are binary distributions, this package must be '
-          'binary-compatible with the worker environment (e.g. Python 2.7 '
-          'running on an x64 Linux host).')
-
-    if not os.path.isfile(package):
-      if package.startswith('gs://'):
-        if not staging_temp_dir:
-          staging_temp_dir = tempfile.mkdtemp(dir=temp_dir)
-        logging.info('Downloading extra package: %s locally before staging',
-                     package)
-        _dependency_file_copy(package, staging_temp_dir)
-      else:
-        raise RuntimeError(
-            'The file %s cannot be found. It was specified in the '
-            '--extra_packages command line option.' % package)
-    else:
-      local_packages.append(package)
-
-  if staging_temp_dir:
-    local_packages.extend(
-        [utils.path.join(staging_temp_dir, f) for f in os.listdir(
-            staging_temp_dir)])
-
-  for package in local_packages:
-    basename = os.path.basename(package)
-    staged_path = utils.path.join(staging_location, basename)
-    file_copy(package, staged_path)
-    resources.append(basename)
-  # Create a file containing the list of extra packages and stage it.
-  # The file is important so that in the worker the packages are installed
-  # exactly in the order specified. This approach will avoid extra PyPI
-  # requests. For example if package A depends on package B and package A
-  # is installed first then the installer will try to satisfy the
-  # dependency on B by downloading the package from PyPI. If package B is
-  # installed first this is avoided.
-  with open(os.path.join(temp_dir, EXTRA_PACKAGES_FILE), 'wt') as f:
-    for package in local_packages:
-      f.write('%s\n' % os.path.basename(package))
-  staged_path = utils.path.join(staging_location, EXTRA_PACKAGES_FILE)
-  # Note that the caller of this function is responsible for deleting the
-  # temporary folder where all temp files are created, including this one.
-  file_copy(os.path.join(temp_dir, EXTRA_PACKAGES_FILE), staged_path)
-  resources.append(EXTRA_PACKAGES_FILE)
-
-  return resources
-
-
-def _get_python_executable():
-  # Allow overriding the python executable to use for downloading and
-  # installing dependencies, otherwise use the python executable for
-  # the current process.
-  python_bin = os.environ.get('BEAM_PYTHON') or sys.executable
-  if not python_bin:
-    raise ValueError('Could not find Python executable.')
-  return python_bin
-
-
-def _populate_requirements_cache(requirements_file, cache_dir):
-  # The 'pip download' command will not download again if it finds the
-  # tarball with the proper version already present.
-  # It will get the packages downloaded in the order they are presented in
-  # the requirements file and will not download package dependencies.
-  cmd_args = [
-      _get_python_executable(), '-m', 'pip', 'install', '--download', 
cache_dir,
-      '-r', requirements_file,
-      # Download from PyPI source distributions.
-      '--no-binary', ':all:']
-  logging.info('Executing command: %s', cmd_args)
-  processes.check_call(cmd_args)
-
-
-def stage_job_resources(
-    options, file_copy=_dependency_file_copy, build_setup_args=None,
-    temp_dir=None, populate_requirements_cache=_populate_requirements_cache):
-  """Creates (if needed) and stages job resources to options.staging_location.
-
-  Args:
-    options: Command line options. More specifically the function will expect
-      staging_location, requirements_file, setup_file, and save_main_session
-      options to be present.
-    file_copy: Callable for copying files. The default version will copy from
-      a local file to a GCS location using the gsutil tool available in the
-      Google Cloud SDK package.
-    build_setup_args: A list of command line arguments used to build a setup
-      package. Used only if options.setup_file is not None. Used only for
-      testing.
-    temp_dir: Temporary folder where the resource building can happen. If None
-      then a unique temp directory will be created. Used only for testing.
-    populate_requirements_cache: Callable for populating the requirements 
cache.
-      Used only for testing.
-
-  Returns:
-    A list of file names (no paths) for the resources staged. All the files
-    are assumed to be staged in options.staging_location.
-
-  Raises:
-    RuntimeError: If files specified are not found or error encountered while
-      trying to create the resources (e.g., build a setup package).
-  """
-  temp_dir = temp_dir or tempfile.mkdtemp()
-  resources = []
-
-  google_cloud_options = options.view_as(GoogleCloudOptions)
-  setup_options = options.view_as(SetupOptions)
-  # Make sure that all required options are specified. There are a few that 
have
-  # defaults to support local running scenarios.
-  if google_cloud_options.staging_location is None:
-    raise RuntimeError(
-        'The --staging_location option must be specified.')
-  if google_cloud_options.temp_location is None:
-    raise RuntimeError(
-        'The --temp_location option must be specified.')
-
-  # Stage a requirements file if present.
-  if setup_options.requirements_file is not None:
-    if not os.path.isfile(setup_options.requirements_file):
-      raise RuntimeError('The file %s cannot be found. It was specified in the 
'
-                         '--requirements_file command line option.' %
-                         setup_options.requirements_file)
-    staged_path = utils.path.join(google_cloud_options.staging_location,
-                                  REQUIREMENTS_FILE)
-    file_copy(setup_options.requirements_file, staged_path)
-    resources.append(REQUIREMENTS_FILE)
-    requirements_cache_path = (
-        os.path.join(tempfile.gettempdir(), 'dataflow-requirements-cache')
-        if setup_options.requirements_cache is None
-        else setup_options.requirements_cache)
-    # Populate cache with packages from requirements and stage the files
-    # in the cache.
-    if not os.path.exists(requirements_cache_path):
-      os.makedirs(requirements_cache_path)
-    populate_requirements_cache(
-        setup_options.requirements_file, requirements_cache_path)
-    for pkg in  glob.glob(os.path.join(requirements_cache_path, '*')):
-      file_copy(pkg, utils.path.join(google_cloud_options.staging_location,
-                                     os.path.basename(pkg)))
-      resources.append(os.path.basename(pkg))
-
-  # Handle a setup file if present.
-  # We will build the setup package locally and then copy it to the staging
-  # location because the staging location is a GCS path and the file cannot be
-  # created directly there.
-  if setup_options.setup_file is not None:
-    if not os.path.isfile(setup_options.setup_file):
-      raise RuntimeError('The file %s cannot be found. It was specified in the 
'
-                         '--setup_file command line option.' %
-                         setup_options.setup_file)
-    if os.path.basename(setup_options.setup_file) != 'setup.py':
-      raise RuntimeError(
-          'The --setup_file option expects the full path to a file named '
-          'setup.py instead of %s' % setup_options.setup_file)
-    tarball_file = _build_setup_package(setup_options.setup_file, temp_dir,
-                                        build_setup_args)
-    staged_path = utils.path.join(google_cloud_options.staging_location,
-                                  WORKFLOW_TARBALL_FILE)
-    file_copy(tarball_file, staged_path)
-    resources.append(WORKFLOW_TARBALL_FILE)
-
-  # Handle extra local packages that should be staged.
-  if setup_options.extra_packages is not None:
-    resources.extend(
-        _stage_extra_packages(setup_options.extra_packages,
-                              google_cloud_options.staging_location,
-                              temp_dir=temp_dir, file_copy=file_copy))
-
-  # Pickle the main session if requested.
-  # We will create the pickled main session locally and then copy it to the
-  # staging location because the staging location is a GCS path and the file
-  # cannot be created directly there.
-  if setup_options.save_main_session:
-    pickled_session_file = os.path.join(temp_dir,
-                                        names.PICKLED_MAIN_SESSION_FILE)
-    pickler.dump_session(pickled_session_file)
-    staged_path = utils.path.join(google_cloud_options.staging_location,
-                                  names.PICKLED_MAIN_SESSION_FILE)
-    file_copy(pickled_session_file, staged_path)
-    resources.append(names.PICKLED_MAIN_SESSION_FILE)
-
-  if hasattr(setup_options, 'sdk_location') and setup_options.sdk_location:
-    if setup_options.sdk_location == 'default':
-      stage_tarball_from_remote_location = True
-    elif (setup_options.sdk_location.startswith('gs://') or
-          setup_options.sdk_location.startswith('http://') or
-          setup_options.sdk_location.startswith('https://')):
-      stage_tarball_from_remote_location = True
-    else:
-      stage_tarball_from_remote_location = False
-
-    staged_path = utils.path.join(google_cloud_options.staging_location,
-                                  names.DATAFLOW_SDK_TARBALL_FILE)
-    if stage_tarball_from_remote_location:
-      # If --sdk_location is not specified then the appropriate package
-      # will be obtained from PyPI (https://pypi.python.org) based on the
-      # version of the currently running SDK. If the option is
-      # present then no version matching is made and the exact URL or path
-      # is expected.
-      #
-      # Unit tests running in the 'python setup.py test' context will
-      # not have the sdk_location attribute present and therefore we
-      # will not stage a tarball.
-      if setup_options.sdk_location == 'default':
-        sdk_remote_location = 'pypi'
-      else:
-        sdk_remote_location = setup_options.sdk_location
-      _stage_dataflow_sdk_tarball(sdk_remote_location, staged_path, temp_dir)
-      resources.append(names.DATAFLOW_SDK_TARBALL_FILE)
-    else:
-      # Check if we have a local Dataflow SDK tarball present. This branch is
-      # used by tests running with the SDK built at head.
-      if setup_options.sdk_location == 'default':
-        module_path = os.path.abspath(__file__)
-        sdk_path = os.path.join(
-            os.path.dirname(module_path), '..', 
names.DATAFLOW_SDK_TARBALL_FILE)
-      elif os.path.isdir(setup_options.sdk_location):
-        sdk_path = os.path.join(
-            setup_options.sdk_location, names.DATAFLOW_SDK_TARBALL_FILE)
-      else:
-        sdk_path = setup_options.sdk_location
-      if os.path.isfile(sdk_path):
-        logging.info('Copying dataflow SDK "%s" to staging location.', 
sdk_path)
-        file_copy(sdk_path, staged_path)
-        resources.append(names.DATAFLOW_SDK_TARBALL_FILE)
-      else:
-        if setup_options.sdk_location == 'default':
-          raise RuntimeError('Cannot find default Dataflow SDK tar file "%s"',
-                             sdk_path)
-        else:
-          raise RuntimeError(
-              'The file "%s" cannot be found. Its location was specified by '
-              'the --sdk_location command-line option.' %
-              sdk_path)
-
-  # Delete all temp files created while staging job resources.
-  shutil.rmtree(temp_dir)
-  return resources
-
-
-def _build_setup_package(setup_file, temp_dir, build_setup_args=None):
-  saved_current_directory = os.getcwd()
-  try:
-    os.chdir(os.path.dirname(setup_file))
-    if build_setup_args is None:
-      build_setup_args = [
-          _get_python_executable(), os.path.basename(setup_file),
-          'sdist', '--dist-dir', temp_dir]
-    logging.info('Executing command: %s', build_setup_args)
-    processes.check_call(build_setup_args)
-    output_files = glob.glob(os.path.join(temp_dir, '*.tar.gz'))
-    if not output_files:
-      raise RuntimeError(
-          'File %s not found.' % os.path.join(temp_dir, '*.tar.gz'))
-    return output_files[0]
-  finally:
-    os.chdir(saved_current_directory)
-
-
-def _stage_dataflow_sdk_tarball(sdk_remote_location, staged_path, temp_dir):
-  """Stage a Dataflow SDK tarball with the appropriate version.
-
-  Args:
-    sdk_remote_location: A GCS path to a Dataflow SDK tarball or a URL from
-      the file can be downloaded.
-    staged_path: GCS path where the found SDK tarball should be copied.
-    temp_dir: path to temporary location where the file should be downloaded.
-
-  Raises:
-    RuntimeError: If wget on the URL specified returs errors or the file
-      cannot be copied from/to GCS.
-  """
-  if (sdk_remote_location.startswith('http://') or
-      sdk_remote_location.startswith('https://')):
-    logging.info(
-        'Staging Dataflow SDK tarball from %s to %s',
-        sdk_remote_location, staged_path)
-    local_download_file = _dependency_file_download(
-        sdk_remote_location, temp_dir)
-    _dependency_file_copy(local_download_file, staged_path)
-  elif sdk_remote_location.startswith('gs://'):
-    # Stage the file to the GCS staging area.
-    logging.info(
-        'Staging Dataflow SDK tarball from %s to %s',
-        sdk_remote_location, staged_path)
-    _dependency_file_copy(sdk_remote_location, staged_path)
-  elif sdk_remote_location == 'pypi':
-    logging.info('Staging the SDK tarball from PyPI to %s', staged_path)
-    _dependency_file_copy(_download_pypi_sdk_package(temp_dir), staged_path)
-  else:
-    raise RuntimeError(
-        'The --sdk_location option was used with an unsupported '
-        'type of location: %s' % sdk_remote_location)
-
-
-def get_required_container_version():
-  """Returns the Google Cloud Dataflow container version for remote execution.
-  """
-  # TODO(silviuc): Handle apache-beam versions when we have official releases.
-  import pkg_resources as pkg
-  try:
-    version = pkg.get_distribution(GOOGLE_PACKAGE_NAME).version
-    # We drop any pre/post parts of the version and we keep only the X.Y.Z
-    # format.  For instance the 0.3.0rc2 SDK version translates into 0.3.0.
-    container_version = '%s.%s.%s' % 
pkg.parse_version(version)._version.release
-    # We do, however, keep the ".dev" suffix if it is present.
-    if re.match(r'.*\.dev[0-9]*$', version):
-      container_version += '.dev'
-    return container_version
-  except pkg.DistributionNotFound:
-    # This case covers Apache Beam end-to-end testing scenarios. All these 
tests
-    # will run with a special container version.
-    return 'beamhead'
-
-
-def get_sdk_name_and_version():
-  """Returns name and version of SDK reported to Google Cloud Dataflow."""
-  # TODO(ccy): Make this check cleaner.
-  container_version = get_required_container_version()
-  if container_version == 'beamhead':
-    return ('Apache Beam SDK for Python', beam_version.__version__)
-  else:
-    return ('Google Cloud Dataflow SDK for Python', container_version)
-
-
-def _download_pypi_sdk_package(temp_dir):
-  """Downloads SDK package from PyPI and returns path to local path."""
-  # TODO(silviuc): Handle apache-beam versions when we have official releases.
-  import pkg_resources as pkg
-  try:
-    version = pkg.get_distribution(GOOGLE_PACKAGE_NAME).version
-  except pkg.DistributionNotFound:
-    raise RuntimeError('Please set --sdk_location command-line option '
-                       'or install a valid {} distribution.'
-                       .format(GOOGLE_PACKAGE_NAME))
-
-  # Get a source distribution for the SDK package from PyPI.
-  cmd_args = [
-      _get_python_executable(), '-m', 'pip', 'install', '--download', temp_dir,
-      '%s==%s' % (GOOGLE_PACKAGE_NAME, version),
-      '--no-binary', ':all:', '--no-deps']
-  logging.info('Executing command: %s', cmd_args)
-  processes.check_call(cmd_args)
-  zip_expected = os.path.join(
-      temp_dir, '%s-%s.zip' % (GOOGLE_PACKAGE_NAME, version))
-  if os.path.exists(zip_expected):
-    return zip_expected
-  tgz_expected = os.path.join(
-      temp_dir, '%s-%s.tar.gz' % (GOOGLE_PACKAGE_NAME, version))
-  if os.path.exists(tgz_expected):
-    return tgz_expected
-  raise RuntimeError(
-      'Failed to download a source distribution for the running SDK. Expected '
-      'either %s or %s to be found in the download folder.' % (
-          zip_expected, tgz_expected))

http://git-wip-us.apache.org/repos/asf/beam/blob/821669da/sdks/python/apache_beam/utils/dependency_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/dependency_test.py 
b/sdks/python/apache_beam/utils/dependency_test.py
deleted file mode 100644
index 75a89e2..0000000
--- a/sdks/python/apache_beam/utils/dependency_test.py
+++ /dev/null
@@ -1,425 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""Unit tests for the setup module."""
-
-import logging
-import os
-import shutil
-import tempfile
-import unittest
-
-from apache_beam import utils
-from apache_beam.utils import dependency
-from apache_beam.utils import names
-from apache_beam.utils.pipeline_options import GoogleCloudOptions
-from apache_beam.utils.pipeline_options import PipelineOptions
-from apache_beam.utils.pipeline_options import SetupOptions
-
-
-class SetupTest(unittest.TestCase):
-
-  def update_options(self, options):
-    setup_options = options.view_as(SetupOptions)
-    setup_options.sdk_location = ''
-    google_cloud_options = options.view_as(GoogleCloudOptions)
-    if google_cloud_options.temp_location is None:
-      google_cloud_options.temp_location = 
google_cloud_options.staging_location
-
-  def create_temp_file(self, path, contents):
-    with open(path, 'w') as f:
-      f.write(contents)
-      return f.name
-
-  def populate_requirements_cache(self, requirements_file, cache_dir):
-    _ = requirements_file
-    self.create_temp_file(os.path.join(cache_dir, 'abc.txt'), 'nothing')
-    self.create_temp_file(os.path.join(cache_dir, 'def.txt'), 'nothing')
-
-  def test_no_staging_location(self):
-    with self.assertRaises(RuntimeError) as cm:
-      dependency.stage_job_resources(PipelineOptions())
-    self.assertEqual('The --staging_location option must be specified.',
-                     cm.exception.message)
-
-  def test_no_temp_location(self):
-    staging_dir = tempfile.mkdtemp()
-    options = PipelineOptions()
-    google_cloud_options = options.view_as(GoogleCloudOptions)
-    google_cloud_options.staging_location = staging_dir
-    self.update_options(options)
-    google_cloud_options.temp_location = None
-    with self.assertRaises(RuntimeError) as cm:
-      dependency.stage_job_resources(options)
-    self.assertEqual('The --temp_location option must be specified.',
-                     cm.exception.message)
-
-  def test_no_main_session(self):
-    staging_dir = tempfile.mkdtemp()
-    options = PipelineOptions()
-
-    options.view_as(GoogleCloudOptions).staging_location = staging_dir
-    options.view_as(SetupOptions).save_main_session = False
-    self.update_options(options)
-
-    self.assertEqual(
-        [],
-        dependency.stage_job_resources(options))
-
-  def test_with_main_session(self):
-    staging_dir = tempfile.mkdtemp()
-    options = PipelineOptions()
-
-    options.view_as(GoogleCloudOptions).staging_location = staging_dir
-    options.view_as(SetupOptions).save_main_session = True
-    self.update_options(options)
-
-    self.assertEqual(
-        [names.PICKLED_MAIN_SESSION_FILE],
-        dependency.stage_job_resources(options))
-    self.assertTrue(
-        os.path.isfile(
-            os.path.join(staging_dir, names.PICKLED_MAIN_SESSION_FILE)))
-
-  def test_default_resources(self):
-    staging_dir = tempfile.mkdtemp()
-    options = PipelineOptions()
-    options.view_as(GoogleCloudOptions).staging_location = staging_dir
-    self.update_options(options)
-
-    self.assertEqual(
-        [],
-        dependency.stage_job_resources(options))
-
-  def test_with_requirements_file(self):
-    try:
-      staging_dir = tempfile.mkdtemp()
-      requirements_cache_dir = tempfile.mkdtemp()
-      source_dir = tempfile.mkdtemp()
-
-      options = PipelineOptions()
-      options.view_as(GoogleCloudOptions).staging_location = staging_dir
-      self.update_options(options)
-      options.view_as(SetupOptions).requirements_cache = requirements_cache_dir
-      options.view_as(SetupOptions).requirements_file = os.path.join(
-          source_dir, dependency.REQUIREMENTS_FILE)
-      self.create_temp_file(
-          os.path.join(source_dir, dependency.REQUIREMENTS_FILE), 'nothing')
-      self.assertEqual(
-          sorted([dependency.REQUIREMENTS_FILE,
-                  'abc.txt', 'def.txt']),
-          sorted(dependency.stage_job_resources(
-              options,
-              populate_requirements_cache=self.populate_requirements_cache)))
-      self.assertTrue(
-          os.path.isfile(
-              os.path.join(staging_dir, dependency.REQUIREMENTS_FILE)))
-      self.assertTrue(os.path.isfile(os.path.join(staging_dir, 'abc.txt')))
-      self.assertTrue(os.path.isfile(os.path.join(staging_dir, 'def.txt')))
-    finally:
-      shutil.rmtree(staging_dir)
-      shutil.rmtree(requirements_cache_dir)
-      shutil.rmtree(source_dir)
-
-  def test_requirements_file_not_present(self):
-    staging_dir = tempfile.mkdtemp()
-    with self.assertRaises(RuntimeError) as cm:
-      options = PipelineOptions()
-      options.view_as(GoogleCloudOptions).staging_location = staging_dir
-      self.update_options(options)
-      options.view_as(SetupOptions).requirements_file = 'nosuchfile'
-      dependency.stage_job_resources(
-          options, 
populate_requirements_cache=self.populate_requirements_cache)
-    self.assertEqual(
-        cm.exception.message,
-        'The file %s cannot be found. It was specified in the '
-        '--requirements_file command line option.' % 'nosuchfile')
-
-  def test_with_requirements_file_and_cache(self):
-    staging_dir = tempfile.mkdtemp()
-    source_dir = tempfile.mkdtemp()
-
-    options = PipelineOptions()
-    options.view_as(GoogleCloudOptions).staging_location = staging_dir
-    self.update_options(options)
-    options.view_as(SetupOptions).requirements_file = os.path.join(
-        source_dir, dependency.REQUIREMENTS_FILE)
-    options.view_as(SetupOptions).requirements_cache = os.path.join(
-        tempfile.gettempdir(), 'alternative-cache-dir')
-    self.create_temp_file(
-        os.path.join(source_dir, dependency.REQUIREMENTS_FILE), 'nothing')
-    self.assertEqual(
-        sorted([dependency.REQUIREMENTS_FILE,
-                'abc.txt', 'def.txt']),
-        sorted(dependency.stage_job_resources(
-            options,
-            populate_requirements_cache=self.populate_requirements_cache)))
-    self.assertTrue(
-        os.path.isfile(
-            os.path.join(staging_dir, dependency.REQUIREMENTS_FILE)))
-    self.assertTrue(os.path.isfile(os.path.join(staging_dir, 'abc.txt')))
-    self.assertTrue(os.path.isfile(os.path.join(staging_dir, 'def.txt')))
-
-  def test_with_setup_file(self):
-    staging_dir = tempfile.mkdtemp()
-    source_dir = tempfile.mkdtemp()
-    self.create_temp_file(
-        os.path.join(source_dir, 'setup.py'), 'notused')
-
-    options = PipelineOptions()
-    options.view_as(GoogleCloudOptions).staging_location = staging_dir
-    self.update_options(options)
-    options.view_as(SetupOptions).setup_file = os.path.join(
-        source_dir, 'setup.py')
-
-    self.assertEqual(
-        [dependency.WORKFLOW_TARBALL_FILE],
-        dependency.stage_job_resources(
-            options,
-            # We replace the build setup command because a realistic one would
-            # require the setuptools package to be installed. Note that we 
can't
-            # use "touch" here to create the expected output tarball file, 
since
-            # touch is not available on Windows, so we invoke python to produce
-            # equivalent behavior.
-            build_setup_args=[
-                'python', '-c', 'open(__import__("sys").argv[1], "a")',
-                os.path.join(source_dir, dependency.WORKFLOW_TARBALL_FILE)],
-            temp_dir=source_dir))
-    self.assertTrue(
-        os.path.isfile(
-            os.path.join(staging_dir, dependency.WORKFLOW_TARBALL_FILE)))
-
-  def test_setup_file_not_present(self):
-    staging_dir = tempfile.mkdtemp()
-
-    options = PipelineOptions()
-    options.view_as(GoogleCloudOptions).staging_location = staging_dir
-    self.update_options(options)
-    options.view_as(SetupOptions).setup_file = 'nosuchfile'
-
-    with self.assertRaises(RuntimeError) as cm:
-      dependency.stage_job_resources(options)
-    self.assertEqual(
-        cm.exception.message,
-        'The file %s cannot be found. It was specified in the '
-        '--setup_file command line option.' % 'nosuchfile')
-
-  def test_setup_file_not_named_setup_dot_py(self):
-    staging_dir = tempfile.mkdtemp()
-    source_dir = tempfile.mkdtemp()
-
-    options = PipelineOptions()
-    options.view_as(GoogleCloudOptions).staging_location = staging_dir
-    self.update_options(options)
-    options.view_as(SetupOptions).setup_file = (
-        os.path.join(source_dir, 'xyz-setup.py'))
-
-    self.create_temp_file(
-        os.path.join(source_dir, 'xyz-setup.py'), 'notused')
-    with self.assertRaises(RuntimeError) as cm:
-      dependency.stage_job_resources(options)
-    self.assertTrue(
-        cm.exception.message.startswith(
-            'The --setup_file option expects the full path to a file named '
-            'setup.py instead of '))
-
-  def override_file_copy(self, expected_from_path, expected_to_dir):
-    def file_copy(from_path, to_path):
-      if not from_path.endswith(names.PICKLED_MAIN_SESSION_FILE):
-        self.assertEqual(expected_from_path, from_path)
-        self.assertEqual(utils.path.join(expected_to_dir,
-                                         names.DATAFLOW_SDK_TARBALL_FILE),
-                         to_path)
-      if from_path.startswith('gs://') or to_path.startswith('gs://'):
-        logging.info('Faking file_copy(%s, %s)', from_path, to_path)
-      else:
-        shutil.copyfile(from_path, to_path)
-    dependency._dependency_file_copy = file_copy
-
-  def override_file_download(self, expected_from_url, expected_to_folder):
-    def file_download(from_url, _):
-      self.assertEqual(expected_from_url, from_url)
-      tarball_path = os.path.join(expected_to_folder, 'sdk-tarball')
-      with open(tarball_path, 'w') as f:
-        f.write('Some contents.')
-      return tarball_path
-    dependency._dependency_file_download = file_download
-    return os.path.join(expected_to_folder, 'sdk-tarball')
-
-  def override_pypi_download(self, expected_from_url, expected_to_folder):
-    def pypi_download(_):
-      tarball_path = os.path.join(expected_to_folder, 'sdk-tarball')
-      with open(tarball_path, 'w') as f:
-        f.write('Some contents.')
-      return tarball_path
-    dependency._download_pypi_sdk_package = pypi_download
-    return os.path.join(expected_to_folder, 'sdk-tarball')
-
-  def test_sdk_location_default(self):
-    staging_dir = tempfile.mkdtemp()
-    expected_from_url = 'pypi'
-    expected_from_path = self.override_pypi_download(
-        expected_from_url, staging_dir)
-    self.override_file_copy(expected_from_path, staging_dir)
-
-    options = PipelineOptions()
-    options.view_as(GoogleCloudOptions).staging_location = staging_dir
-    self.update_options(options)
-    options.view_as(SetupOptions).sdk_location = 'default'
-
-    self.assertEqual(
-        [names.DATAFLOW_SDK_TARBALL_FILE],
-        dependency.stage_job_resources(
-            options,
-            file_copy=dependency._dependency_file_copy))
-
-  def test_sdk_location_local(self):
-    staging_dir = tempfile.mkdtemp()
-    sdk_location = tempfile.mkdtemp()
-    self.create_temp_file(
-        os.path.join(
-            sdk_location,
-            names.DATAFLOW_SDK_TARBALL_FILE),
-        'contents')
-
-    options = PipelineOptions()
-    options.view_as(GoogleCloudOptions).staging_location = staging_dir
-    self.update_options(options)
-    options.view_as(SetupOptions).sdk_location = sdk_location
-
-    self.assertEqual(
-        [names.DATAFLOW_SDK_TARBALL_FILE],
-        dependency.stage_job_resources(options))
-    tarball_path = os.path.join(
-        staging_dir, names.DATAFLOW_SDK_TARBALL_FILE)
-    with open(tarball_path) as f:
-      self.assertEqual(f.read(), 'contents')
-
-  def test_sdk_location_local_not_present(self):
-    staging_dir = tempfile.mkdtemp()
-    sdk_location = 'nosuchdir'
-    with self.assertRaises(RuntimeError) as cm:
-      options = PipelineOptions()
-      options.view_as(GoogleCloudOptions).staging_location = staging_dir
-      self.update_options(options)
-      options.view_as(SetupOptions).sdk_location = sdk_location
-
-      dependency.stage_job_resources(options)
-    self.assertEqual(
-        'The file "%s" cannot be found. Its '
-        'location was specified by the --sdk_location command-line option.' %
-        sdk_location,
-        cm.exception.message)
-
-  def test_sdk_location_gcs(self):
-    staging_dir = tempfile.mkdtemp()
-    sdk_location = 'gs://my-gcs-bucket/tarball.tar.gz'
-    self.override_file_copy(sdk_location, staging_dir)
-
-    options = PipelineOptions()
-    options.view_as(GoogleCloudOptions).staging_location = staging_dir
-    self.update_options(options)
-    options.view_as(SetupOptions).sdk_location = sdk_location
-
-    self.assertEqual(
-        [names.DATAFLOW_SDK_TARBALL_FILE],
-        dependency.stage_job_resources(options))
-
-  def test_with_extra_packages(self):
-    staging_dir = tempfile.mkdtemp()
-    source_dir = tempfile.mkdtemp()
-    self.create_temp_file(
-        os.path.join(source_dir, 'abc.tar.gz'), 'nothing')
-    self.create_temp_file(
-        os.path.join(source_dir, 'xyz.tar.gz'), 'nothing')
-    self.create_temp_file(
-        os.path.join(source_dir, 'xyz2.tar'), 'nothing')
-    self.create_temp_file(
-        os.path.join(source_dir, 'whl.whl'), 'nothing')
-    self.create_temp_file(
-        os.path.join(source_dir, dependency.EXTRA_PACKAGES_FILE), 'nothing')
-
-    options = PipelineOptions()
-    options.view_as(GoogleCloudOptions).staging_location = staging_dir
-    self.update_options(options)
-    options.view_as(SetupOptions).extra_packages = [
-        os.path.join(source_dir, 'abc.tar.gz'),
-        os.path.join(source_dir, 'xyz.tar.gz'),
-        os.path.join(source_dir, 'xyz2.tar'),
-        os.path.join(source_dir, 'whl.whl'),
-        'gs://my-gcs-bucket/gcs.tar.gz']
-
-    gcs_copied_files = []
-
-    def file_copy(from_path, to_path):
-      if from_path.startswith('gs://'):
-        gcs_copied_files.append(from_path)
-        _, from_name = os.path.split(from_path)
-        self.create_temp_file(os.path.join(to_path, from_name), 'nothing')
-        logging.info('Fake copied GCS file: %s to %s', from_path, to_path)
-      elif to_path.startswith('gs://'):
-        logging.info('Faking file_copy(%s, %s)', from_path, to_path)
-      else:
-        shutil.copyfile(from_path, to_path)
-
-    dependency._dependency_file_copy = file_copy
-
-    self.assertEqual(
-        ['abc.tar.gz', 'xyz.tar.gz', 'xyz2.tar', 'whl.whl', 'gcs.tar.gz',
-         dependency.EXTRA_PACKAGES_FILE],
-        dependency.stage_job_resources(options))
-    with open(os.path.join(staging_dir, dependency.EXTRA_PACKAGES_FILE)) as f:
-      self.assertEqual(['abc.tar.gz\n', 'xyz.tar.gz\n', 'xyz2.tar\n',
-                        'whl.whl\n', 'gcs.tar.gz\n'], f.readlines())
-    self.assertEqual(['gs://my-gcs-bucket/gcs.tar.gz'], gcs_copied_files)
-
-  def test_with_extra_packages_missing_files(self):
-    staging_dir = tempfile.mkdtemp()
-    with self.assertRaises(RuntimeError) as cm:
-
-      options = PipelineOptions()
-      options.view_as(GoogleCloudOptions).staging_location = staging_dir
-      self.update_options(options)
-      options.view_as(SetupOptions).extra_packages = ['nosuchfile.tar.gz']
-
-      dependency.stage_job_resources(options)
-    self.assertEqual(
-        cm.exception.message,
-        'The file %s cannot be found. It was specified in the '
-        '--extra_packages command line option.' % 'nosuchfile.tar.gz')
-
-  def test_with_extra_packages_invalid_file_name(self):
-    staging_dir = tempfile.mkdtemp()
-    source_dir = tempfile.mkdtemp()
-    self.create_temp_file(
-        os.path.join(source_dir, 'abc.tgz'), 'nothing')
-    with self.assertRaises(RuntimeError) as cm:
-      options = PipelineOptions()
-      options.view_as(GoogleCloudOptions).staging_location = staging_dir
-      self.update_options(options)
-      options.view_as(SetupOptions).extra_packages = [
-          os.path.join(source_dir, 'abc.tgz')]
-      dependency.stage_job_resources(options)
-    self.assertEqual(
-        cm.exception.message,
-        'The --extra_package option expects a full path ending with ".tar" or '
-        '".tar.gz" instead of %s' % os.path.join(source_dir, 'abc.tgz'))
-
-
-if __name__ == '__main__':
-  logging.getLogger().setLevel(logging.INFO)
-  unittest.main()

http://git-wip-us.apache.org/repos/asf/beam/blob/821669da/sdks/python/apache_beam/utils/profiler.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/profiler.py 
b/sdks/python/apache_beam/utils/profiler.py
index 2a2df17..852b659 100644
--- a/sdks/python/apache_beam/utils/profiler.py
+++ b/sdks/python/apache_beam/utils/profiler.py
@@ -24,10 +24,8 @@ import pstats
 import StringIO
 import tempfile
 import time
-from threading import Timer
 import warnings
-
-from apache_beam.utils.dependency import _dependency_file_copy
+from threading import Timer
 
 
 class Profile(object):
@@ -35,11 +33,13 @@ class Profile(object):
 
   SORTBY = 'cumulative'
 
-  def __init__(self, profile_id, profile_location=None, log_results=False):
+  def __init__(self, profile_id, profile_location=None, log_results=False,
+               file_copy_fn=None):
     self.stats = None
     self.profile_id = str(profile_id)
     self.profile_location = profile_location
     self.log_results = log_results
+    self.file_copy_fn = file_copy_fn
 
   def __enter__(self):
     logging.info('Start profiling: %s', self.profile_id)
@@ -51,14 +51,14 @@ class Profile(object):
     self.profile.disable()
     logging.info('Stop profiling: %s', self.profile_id)
 
-    if self.profile_location:
+    if self.profile_location and self.file_copy_fn:
       dump_location = os.path.join(
           self.profile_location, 'profile',
           ('%s-%s' % (time.strftime('%Y-%m-%d_%H_%M_%S'), self.profile_id)))
       fd, filename = tempfile.mkstemp()
       self.profile.dump_stats(filename)
       logging.info('Copying profiler data to: [%s]', dump_location)
-      _dependency_file_copy(filename, dump_location)  # pylint: 
disable=protected-access
+      self.file_copy_fn(filename, dump_location)  # pylint: 
disable=protected-access
       os.close(fd)
       os.remove(filename)
 

Reply via email to