Repository: beam Updated Branches: refs/heads/master 2dd1907c6 -> bf5aa1bca
Use SDK harness container for FnAPI jobs when worker_harness_container_image is not specified. Add a separate image tag to use with the SDK harness container. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f46a40c2 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f46a40c2 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f46a40c2 Branch: refs/heads/master Commit: f46a40c279499737bb7fb45af5e299d76f6af49b Parents: 2dd1907 Author: Valentyn Tymofieiev <valen...@google.com> Authored: Wed Jun 28 16:41:03 2017 -0700 Committer: Ahmet Altay <al...@google.com> Committed: Thu Jun 29 10:35:53 2017 -0700 ---------------------------------------------------------------------- .../runners/dataflow/internal/apiclient.py | 6 +-- .../runners/dataflow/internal/dependency.py | 44 +++++++++++++++++--- 2 files changed, 39 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/f46a40c2/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py index df1a3f2..edac9d7 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py @@ -38,7 +38,6 @@ from apache_beam.io.filesystems import FileSystems from apache_beam.io.gcp.internal.clients import storage from apache_beam.runners.dataflow.internal import dependency from apache_beam.runners.dataflow.internal.clients import dataflow -from apache_beam.runners.dataflow.internal.dependency import get_required_container_version from apache_beam.runners.dataflow.internal.dependency import get_sdk_name_and_version from apache_beam.runners.dataflow.internal.names import PropertyNames from apache_beam.transforms import cy_combiners @@ -205,11 +204,8 @@ class Environment(object): pool.workerHarnessContainerImage = ( self.worker_options.worker_harness_container_image) else: - # Default to using the worker harness container image for the current SDK - # version. pool.workerHarnessContainerImage = ( - 'dataflow.gcr.io/v1beta3/python:%s' % - get_required_container_version()) + dependency.get_default_container_image_for_current_sdk(job_type)) if self.worker_options.use_public_ips is not None: if self.worker_options.use_public_ips: pool.ipConfiguration = ( http://git-wip-us.apache.org/repos/asf/beam/blob/f46a40c2/sdks/python/apache_beam/runners/dataflow/internal/dependency.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/dataflow/internal/dependency.py b/sdks/python/apache_beam/runners/dataflow/internal/dependency.py index 03e1794..a40a582 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/dependency.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/dependency.py @@ -71,9 +71,15 @@ from apache_beam.options.pipeline_options import SetupOptions # Update this version to the next version whenever there is a change that will -# require changes to the execution environment. +# require changes to legacy Dataflow worker execution environment. # This should be in the beam-[version]-[date] format, date is optional. +# BEAM_CONTAINER_VERSION and BEAM_FNAPI_CONTAINER version should coincide +# when we make a release. BEAM_CONTAINER_VERSION = 'beam-2.1.0-20170626' +# Update this version to the next version whenever there is a change that +# requires changes to SDK harness container or SDK harness launcher. +# This should be in the beam-[version]-[date] format, date is optional. +BEAM_FNAPI_CONTAINER_VERSION = 'beam-2.1.0-20170621' # Standard file names used for staging files. WORKFLOW_TARBALL_FILE = 'workflow.tar.gz' @@ -474,10 +480,33 @@ def _stage_beam_sdk_tarball(sdk_remote_location, staged_path, temp_dir): 'type of location: %s' % sdk_remote_location) -def get_required_container_version(): +def get_default_container_image_for_current_sdk(job_type): + """For internal use only; no backwards-compatibility guarantees. + + Args: + job_type (str): BEAM job type. + + Returns: + str: Google Cloud Dataflow container image for remote execution. + """ + # TODO(tvalentyn): Use enumerated type instead of strings for job types. + if job_type == 'FNAPI_BATCH' or job_type == 'FNAPI_STREAMING': + image_name = 'dataflow.gcr.io/v1beta3/python-fnapi' + else: + image_name = 'dataflow.gcr.io/v1beta3/python' + image_tag = _get_required_container_version(job_type) + return image_name + ':' + image_tag + + +def _get_required_container_version(job_type=None): """For internal use only; no backwards-compatibility guarantees. - Returns the Google Cloud Dataflow container version for remote execution. + Args: + job_type (str, optional): BEAM job type. Defaults to None. + + Returns: + str: The tag of worker container images in GCR that corresponds to + current version of the SDK. """ # TODO(silviuc): Handle apache-beam versions when we have official releases. import pkg_resources as pkg @@ -493,7 +522,10 @@ def get_required_container_version(): except pkg.DistributionNotFound: # This case covers Apache Beam end-to-end testing scenarios. All these tests # will run with a special container version. - return BEAM_CONTAINER_VERSION + if job_type == 'FNAPI_BATCH' or job_type == 'FNAPI_STREAMING': + return BEAM_FNAPI_CONTAINER_VERSION + else: + return BEAM_CONTAINER_VERSION def get_sdk_name_and_version(): @@ -501,7 +533,7 @@ def get_sdk_name_and_version(): Returns name and version of SDK reported to Google Cloud Dataflow.""" import pkg_resources as pkg - container_version = get_required_container_version() + container_version = _get_required_container_version() try: pkg.get_distribution(GOOGLE_PACKAGE_NAME) return ('Google Cloud Dataflow SDK for Python', container_version) @@ -513,7 +545,7 @@ def get_sdk_package_name(): """For internal use only; no backwards-compatibility guarantees. Returns the PyPI package name to be staged to Google Cloud Dataflow.""" - container_version = get_required_container_version() + container_version = _get_required_container_version() if container_version == BEAM_CONTAINER_VERSION: return BEAM_PACKAGE_NAME return GOOGLE_PACKAGE_NAME