http://git-wip-us.apache.org/repos/asf/beam/blob/59ad58ac/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/apiclient.py
----------------------------------------------------------------------
diff --git 
a/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/apiclient.py 
b/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/apiclient.py
deleted file mode 100644
index 98473ca..0000000
--- 
a/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/apiclient.py
+++ /dev/null
@@ -1,726 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""Dataflow client utility functions."""
-
-import codecs
-import getpass
-import json
-import logging
-import os
-import re
-import time
-from StringIO import StringIO
-from datetime import datetime
-
-from apitools.base.py import encoding
-from apitools.base.py import exceptions
-
-from apache_beam import utils
-from apache_beam.internal.auth import get_service_credentials
-from apache_beam.internal.google_cloud_platform.json_value import to_json_value
-from apache_beam.io.google_cloud_platform.internal.clients import storage
-from apache_beam.runners.google_cloud_dataflow.internal.clients import dataflow
-from apache_beam.transforms import cy_combiners
-from apache_beam.transforms.display import DisplayData
-from apache_beam.utils import dependency
-from apache_beam.utils import retry
-from apache_beam.utils.dependency import get_required_container_version
-from apache_beam.utils.dependency import get_sdk_name_and_version
-from apache_beam.utils.names import PropertyNames
-from apache_beam.utils.pipeline_options import DebugOptions
-from apache_beam.utils.pipeline_options import GoogleCloudOptions
-from apache_beam.utils.pipeline_options import StandardOptions
-from apache_beam.utils.pipeline_options import WorkerOptions
-
-
-class Step(object):
-  """Wrapper for a dataflow Step protobuf."""
-
-  def __init__(self, step_kind, step_name, additional_properties=None):
-    self.step_kind = step_kind
-    self.step_name = step_name
-    self.proto = dataflow.Step(kind=step_kind, name=step_name)
-    self.proto.properties = {}
-    self._additional_properties = []
-
-    if additional_properties is not None:
-      for (n, v, t) in additional_properties:
-        self.add_property(n, v, t)
-
-  def add_property(self, name, value, with_type=False):
-    self._additional_properties.append((name, value, with_type))
-    self.proto.properties.additionalProperties.append(
-        dataflow.Step.PropertiesValue.AdditionalProperty(
-            key=name, value=to_json_value(value, with_type=with_type)))
-
-  def _get_outputs(self):
-    """Returns a list of all output labels for a step."""
-    outputs = []
-    for p in self.proto.properties.additionalProperties:
-      if p.key == PropertyNames.OUTPUT_INFO:
-        for entry in p.value.array_value.entries:
-          for entry_prop in entry.object_value.properties:
-            if entry_prop.key == PropertyNames.OUTPUT_NAME:
-              outputs.append(entry_prop.value.string_value)
-    return outputs
-
-  def __reduce__(self):
-    """Reduce hook for pickling the Step class more easily."""
-    return (Step, (self.step_kind, self.step_name, 
self._additional_properties))
-
-  def get_output(self, tag=None):
-    """Returns name if it is one of the outputs or first output if name is 
None.
-
-    Args:
-      tag: tag of the output as a string or None if we want to get the
-        name of the first output.
-
-    Returns:
-      The name of the output associated with the tag or the first output
-      if tag was None.
-
-    Raises:
-      ValueError: if the tag does not exist within outputs.
-    """
-    outputs = self._get_outputs()
-    if tag is None:
-      return outputs[0]
-    else:
-      name = '%s_%s' % (PropertyNames.OUT, tag)
-      if name not in outputs:
-        raise ValueError(
-            'Cannot find named output: %s in %s.' % (name, outputs))
-      return name
-
-
-class Environment(object):
-  """Wrapper for a dataflow Environment protobuf."""
-
-  def __init__(self, packages, options, environment_version):
-    self.standard_options = options.view_as(StandardOptions)
-    self.google_cloud_options = options.view_as(GoogleCloudOptions)
-    self.worker_options = options.view_as(WorkerOptions)
-    self.debug_options = options.view_as(DebugOptions)
-    self.proto = dataflow.Environment()
-    self.proto.clusterManagerApiService = 
GoogleCloudOptions.COMPUTE_API_SERVICE
-    self.proto.dataset = '{}/cloud_dataflow'.format(
-        GoogleCloudOptions.BIGQUERY_API_SERVICE)
-    self.proto.tempStoragePrefix = (
-        self.google_cloud_options.temp_location.replace(
-            'gs:/',
-            GoogleCloudOptions.STORAGE_API_SERVICE))
-    # User agent information.
-    self.proto.userAgent = dataflow.Environment.UserAgentValue()
-    self.local = 'localhost' in self.google_cloud_options.dataflow_endpoint
-
-    if self.google_cloud_options.service_account_email:
-      self.proto.serviceAccountEmail = (
-          self.google_cloud_options.service_account_email)
-
-    sdk_name, version_string = get_sdk_name_and_version()
-
-    self.proto.userAgent.additionalProperties.extend([
-        dataflow.Environment.UserAgentValue.AdditionalProperty(
-            key='name',
-            value=to_json_value(sdk_name)),
-        dataflow.Environment.UserAgentValue.AdditionalProperty(
-            key='version', value=to_json_value(version_string))])
-    # Version information.
-    self.proto.version = dataflow.Environment.VersionValue()
-    if self.standard_options.streaming:
-      job_type = 'PYTHON_STREAMING'
-    else:
-      job_type = 'PYTHON_BATCH'
-    self.proto.version.additionalProperties.extend([
-        dataflow.Environment.VersionValue.AdditionalProperty(
-            key='job_type',
-            value=to_json_value(job_type)),
-        dataflow.Environment.VersionValue.AdditionalProperty(
-            key='major', value=to_json_value(environment_version))])
-    # Experiments
-    if self.debug_options.experiments:
-      for experiment in self.debug_options.experiments:
-        self.proto.experiments.append(experiment)
-    # Worker pool(s) information.
-    package_descriptors = []
-    for package in packages:
-      package_descriptors.append(
-          dataflow.Package(
-              location='%s/%s' % (
-                  self.google_cloud_options.staging_location.replace(
-                      'gs:/', GoogleCloudOptions.STORAGE_API_SERVICE),
-                  package),
-              name=package))
-
-    pool = dataflow.WorkerPool(
-        kind='local' if self.local else 'harness',
-        packages=package_descriptors,
-        taskrunnerSettings=dataflow.TaskRunnerSettings(
-            parallelWorkerSettings=dataflow.WorkerSettings(
-                baseUrl=GoogleCloudOptions.DATAFLOW_ENDPOINT,
-                servicePath=self.google_cloud_options.dataflow_endpoint)))
-    pool.autoscalingSettings = dataflow.AutoscalingSettings()
-    # Set worker pool options received through command line.
-    if self.worker_options.num_workers:
-      pool.numWorkers = self.worker_options.num_workers
-    if self.worker_options.max_num_workers:
-      pool.autoscalingSettings.maxNumWorkers = (
-          self.worker_options.max_num_workers)
-    if self.worker_options.autoscaling_algorithm:
-      values_enum = dataflow.AutoscalingSettings.AlgorithmValueValuesEnum
-      pool.autoscalingSettings.algorithm = {
-          'NONE': values_enum.AUTOSCALING_ALGORITHM_NONE,
-          'THROUGHPUT_BASED': values_enum.AUTOSCALING_ALGORITHM_BASIC,
-      }.get(self.worker_options.autoscaling_algorithm)
-    if self.worker_options.machine_type:
-      pool.machineType = self.worker_options.machine_type
-    if self.worker_options.disk_size_gb:
-      pool.diskSizeGb = self.worker_options.disk_size_gb
-    if self.worker_options.disk_type:
-      pool.diskType = self.worker_options.disk_type
-    if self.worker_options.zone:
-      pool.zone = self.worker_options.zone
-    if self.worker_options.network:
-      pool.network = self.worker_options.network
-    if self.worker_options.worker_harness_container_image:
-      pool.workerHarnessContainerImage = (
-          self.worker_options.worker_harness_container_image)
-    else:
-      # Default to using the worker harness container image for the current SDK
-      # version.
-      pool.workerHarnessContainerImage = (
-          'dataflow.gcr.io/v1beta3/python:%s' %
-          get_required_container_version())
-    if self.worker_options.use_public_ips is not None:
-      if self.worker_options.use_public_ips:
-        pool.ipConfiguration = (
-            dataflow.WorkerPool
-            .IpConfigurationValueValuesEnum.WORKER_IP_PUBLIC)
-      else:
-        pool.ipConfiguration = (
-            dataflow.WorkerPool
-            .IpConfigurationValueValuesEnum.WORKER_IP_PRIVATE)
-
-    if self.standard_options.streaming:
-      # Use separate data disk for streaming.
-      disk = dataflow.Disk()
-      if self.local:
-        disk.diskType = 'local'
-      # TODO(ccy): allow customization of disk.
-      pool.dataDisks.append(disk)
-    self.proto.workerPools.append(pool)
-
-    sdk_pipeline_options = options.get_all_options()
-    if sdk_pipeline_options:
-      self.proto.sdkPipelineOptions = (
-          dataflow.Environment.SdkPipelineOptionsValue())
-
-      options_dict = {k: v
-                      for k, v in sdk_pipeline_options.iteritems()
-                      if v is not None}
-      self.proto.sdkPipelineOptions.additionalProperties.append(
-          dataflow.Environment.SdkPipelineOptionsValue.AdditionalProperty(
-              key='options', value=to_json_value(options_dict)))
-
-      dd = DisplayData.create_from_options(options)
-      items = [item.get_dict() for item in dd.items]
-      self.proto.sdkPipelineOptions.additionalProperties.append(
-          dataflow.Environment.SdkPipelineOptionsValue.AdditionalProperty(
-              key='display_data', value=to_json_value(items)))
-
-
-class Job(object):
-  """Wrapper for a dataflow Job protobuf."""
-
-  def __str__(self):
-    def encode_shortstrings(input_buffer, errors='strict'):
-      """Encoder (from Unicode) that suppresses long base64 strings."""
-      original_len = len(input_buffer)
-      if original_len > 150:
-        if self.base64_str_re.match(input_buffer):
-          input_buffer = '<string of %d bytes>' % original_len
-          input_buffer = input_buffer.encode('ascii', errors=errors)
-        else:
-          matched = self.coder_str_re.match(input_buffer)
-          if matched:
-            input_buffer = '%s<string of %d bytes>' % (
-                matched.group(1), matched.end(2) - matched.start(2))
-            input_buffer = input_buffer.encode('ascii', errors=errors)
-      return input_buffer, original_len
-
-    def decode_shortstrings(input_buffer, errors='strict'):
-      """Decoder (to Unicode) that suppresses long base64 strings."""
-      shortened, length = encode_shortstrings(input_buffer, errors)
-      return unicode(shortened), length
-
-    def shortstrings_registerer(encoding_name):
-      if encoding_name == 'shortstrings':
-        return codecs.CodecInfo(name='shortstrings',
-                                encode=encode_shortstrings,
-                                decode=decode_shortstrings)
-      return None
-
-    codecs.register(shortstrings_registerer)
-
-    # Use json "dump string" method to get readable formatting;
-    # further modify it to not output too-long strings, aimed at the
-    # 10,000+ character hex-encoded "serialized_fn" values.
-    return json.dumps(
-        json.loads(encoding.MessageToJson(self.proto), 
encoding='shortstrings'),
-        indent=2, sort_keys=True)
-
-  @staticmethod
-  def default_job_name(job_name):
-    if job_name is None:
-      user_name = getpass.getuser().lower()
-      date_component = datetime.utcnow().strftime('%m%d%H%M%S-%f')
-      app_name = 'beamapp'
-      job_name = '{}-{}-{}'.format(app_name, user_name, date_component)
-    return job_name
-
-  def __init__(self, options):
-    self.options = options
-    self.google_cloud_options = options.view_as(GoogleCloudOptions)
-    if not self.google_cloud_options.job_name:
-      self.google_cloud_options.job_name = self.default_job_name(
-          self.google_cloud_options.job_name)
-
-    required_google_cloud_options = ['project', 'job_name', 'temp_location']
-    missing = [
-        option for option in required_google_cloud_options
-        if not getattr(self.google_cloud_options, option)]
-    if missing:
-      raise ValueError(
-          'Missing required configuration parameters: %s' % missing)
-
-    if not self.google_cloud_options.staging_location:
-      logging.info('Defaulting to the temp_location as staging_location: %s',
-                   self.google_cloud_options.temp_location)
-      (self.google_cloud_options
-       .staging_location) = self.google_cloud_options.temp_location
-
-    # Make the staging and temp locations job name and time specific. This is
-    # needed to avoid clashes between job submissions using the same staging
-    # area or team members using same job names. This method is not entirely
-    # foolproof since two job submissions with same name can happen at exactly
-    # the same time. However the window is extremely small given that
-    # time.time() has at least microseconds granularity. We add the suffix only
-    # for GCS staging locations where the potential for such clashes is high.
-    if self.google_cloud_options.staging_location.startswith('gs://'):
-      path_suffix = '%s.%f' % (self.google_cloud_options.job_name, time.time())
-      self.google_cloud_options.staging_location = utils.path.join(
-          self.google_cloud_options.staging_location, path_suffix)
-      self.google_cloud_options.temp_location = utils.path.join(
-          self.google_cloud_options.temp_location, path_suffix)
-    self.proto = dataflow.Job(name=self.google_cloud_options.job_name)
-    if self.options.view_as(StandardOptions).streaming:
-      self.proto.type = dataflow.Job.TypeValueValuesEnum.JOB_TYPE_STREAMING
-    else:
-      self.proto.type = dataflow.Job.TypeValueValuesEnum.JOB_TYPE_BATCH
-    self.base64_str_re = re.compile(r'^[A-Za-z0-9+/]*=*$')
-    self.coder_str_re = re.compile(r'^([A-Za-z]+\$)([A-Za-z0-9+/]*=*)$')
-
-  def json(self):
-    return encoding.MessageToJson(self.proto)
-
-  def __reduce__(self):
-    """Reduce hook for pickling the Job class more easily."""
-    return (Job, (self.options,))
-
-
-class DataflowApplicationClient(object):
-  """A Dataflow API client used by application code to create and query 
jobs."""
-
-  def __init__(self, options, environment_version):
-    """Initializes a Dataflow API client object."""
-    self.standard_options = options.view_as(StandardOptions)
-    self.google_cloud_options = options.view_as(GoogleCloudOptions)
-    self.environment_version = environment_version
-    if self.google_cloud_options.no_auth:
-      credentials = None
-    else:
-      credentials = get_service_credentials()
-    self._client = dataflow.DataflowV1b3(
-        url=self.google_cloud_options.dataflow_endpoint,
-        credentials=credentials,
-        get_credentials=(not self.google_cloud_options.no_auth))
-    self._storage_client = storage.StorageV1(
-        url='https://www.googleapis.com/storage/v1',
-        credentials=credentials,
-        get_credentials=(not self.google_cloud_options.no_auth))
-
-  # TODO(silviuc): Refactor so that retry logic can be applied.
-  @retry.no_retries  # Using no_retries marks this as an integration point.
-  def _gcs_file_copy(self, from_path, to_path):
-    to_folder, to_name = os.path.split(to_path)
-    with open(from_path, 'rb') as f:
-      self.stage_file(to_folder, to_name, f)
-
-  def stage_file(self, gcs_or_local_path, file_name, stream,
-                 mime_type='application/octet-stream'):
-    """Stages a file at a GCS or local path with stream-supplied contents."""
-    if not gcs_or_local_path.startswith('gs://'):
-      local_path = os.path.join(gcs_or_local_path, file_name)
-      logging.info('Staging file locally to %s', local_path)
-      with open(local_path, 'wb') as f:
-        f.write(stream.read())
-      return
-    gcs_location = gcs_or_local_path + '/' + file_name
-    bucket, name = gcs_location[5:].split('/', 1)
-
-    request = storage.StorageObjectsInsertRequest(
-        bucket=bucket, name=name)
-    logging.info('Starting GCS upload to %s...', gcs_location)
-    upload = storage.Upload(stream, mime_type)
-    try:
-      response = self._storage_client.objects.Insert(request, upload=upload)
-    except exceptions.HttpError as e:
-      reportable_errors = {
-          403: 'access denied',
-          404: 'bucket not found',
-      }
-      if e.status_code in reportable_errors:
-        raise IOError(('Could not upload to GCS path %s: %s. Please verify '
-                       'that credentials are valid and that you have write '
-                       'access to the specified path. Stale credentials can be 
'
-                       'refreshed by executing "gcloud auth login".') %
-                      (gcs_or_local_path, reportable_errors[e.status_code]))
-      raise
-    logging.info('Completed GCS upload to %s', gcs_location)
-    return response
-
-  # TODO(silviuc): Refactor so that retry logic can be applied.
-  @retry.no_retries  # Using no_retries marks this as an integration point.
-  def create_job(self, job):
-    """Creates job description. May stage and/or submit for remote 
execution."""
-    self.create_job_description(job)
-
-    # Stage and submit the job when necessary
-    dataflow_job_file = job.options.view_as(DebugOptions).dataflow_job_file
-    template_location = (
-        job.options.view_as(GoogleCloudOptions).template_location)
-
-    job_location = template_location or dataflow_job_file
-    if job_location:
-      gcs_or_local_path = os.path.dirname(job_location)
-      file_name = os.path.basename(job_location)
-      self.stage_file(gcs_or_local_path, file_name, StringIO(job.json()))
-
-    if not template_location:
-      return self.submit_job_description(job)
-    else:
-      return None
-
-  def create_job_description(self, job):
-    """Creates a job described by the workflow proto."""
-    resources = dependency.stage_job_resources(
-        job.options, file_copy=self._gcs_file_copy)
-    job.proto.environment = Environment(
-        packages=resources, options=job.options,
-        environment_version=self.environment_version).proto
-    # TODO(silviuc): Remove the debug logging eventually.
-    logging.info('JOB: %s', job)
-
-  def submit_job_description(self, job):
-    """Creates and excutes a job request."""
-    request = dataflow.DataflowProjectsJobsCreateRequest()
-    request.projectId = self.google_cloud_options.project
-    request.job = job.proto
-
-    try:
-      response = self._client.projects_jobs.Create(request)
-    except exceptions.BadStatusCodeError as e:
-      logging.error('HTTP status %d trying to create job'
-                    ' at dataflow service endpoint %s',
-                    e.response.status,
-                    self.google_cloud_options.dataflow_endpoint)
-      logging.fatal('details of server error: %s', e)
-      raise
-    logging.info('Create job: %s', response)
-    # The response is a Job proto with the id for the new job.
-    logging.info('Created job with id: [%s]', response.id)
-    logging.info(
-        'To access the Dataflow monitoring console, please navigate to '
-        'https://console.developers.google.com/project/%s/dataflow/job/%s',
-        self.google_cloud_options.project, response.id)
-
-    return response
-
-  @retry.with_exponential_backoff()  # Using retry defaults from utils/retry.py
-  def modify_job_state(self, job_id, new_state):
-    """Modify the run state of the job.
-
-    Args:
-      job_id: The id of the job.
-      new_state: A string representing the new desired state. It could be set 
to
-      either 'JOB_STATE_DONE', 'JOB_STATE_CANCELLED' or 'JOB_STATE_DRAINING'.
-
-    Returns:
-      True if the job was modified successfully.
-    """
-    if new_state == 'JOB_STATE_DONE':
-      new_state = dataflow.Job.RequestedStateValueValuesEnum.JOB_STATE_DONE
-    elif new_state == 'JOB_STATE_CANCELLED':
-      new_state = 
dataflow.Job.RequestedStateValueValuesEnum.JOB_STATE_CANCELLED
-    elif new_state == 'JOB_STATE_DRAINING':
-      new_state = dataflow.Job.RequestedStateValueValuesEnum.JOB_STATE_DRAINING
-    else:
-      # Other states could only be set by the service.
-      return False
-
-    request = dataflow.DataflowProjectsJobsUpdateRequest()
-    request.jobId = job_id
-    request.projectId = self.google_cloud_options.project
-    request.job = dataflow.Job(requestedState=new_state)
-
-    self._client.projects_jobs.Update(request)
-    return True
-
-  @retry.with_exponential_backoff()  # Using retry defaults from utils/retry.py
-  def get_job(self, job_id):
-    """Gets the job status for a submitted job.
-
-    Args:
-      job_id: A string representing the job_id for the workflow as returned
-        by the a create_job() request.
-
-    Returns:
-      A Job proto. See below for interesting fields.
-
-    The Job proto returned from a get_job() request contains some interesting
-    fields:
-      currentState: An object representing the current state of the job. The
-        string representation of the object (str() result) has the following
-        possible values: JOB_STATE_UNKNONW, JOB_STATE_STOPPED,
-        JOB_STATE_RUNNING, JOB_STATE_DONE, JOB_STATE_FAILED,
-        JOB_STATE_CANCELLED.
-      createTime: UTC time when the job was created
-        (e.g. '2015-03-10T00:01:53.074Z')
-      currentStateTime: UTC time for the current state of the job.
-    """
-    request = dataflow.DataflowProjectsJobsGetRequest()
-    request.jobId = job_id
-    request.projectId = self.google_cloud_options.project
-    response = self._client.projects_jobs.Get(request)
-    return response
-
-  @retry.with_exponential_backoff()  # Using retry defaults from utils/retry.py
-  def list_messages(
-      self, job_id, start_time=None, end_time=None, page_token=None,
-      minimum_importance=None):
-    """List messages associated with the execution of a job.
-
-    Args:
-      job_id: A string representing the job_id for the workflow as returned
-        by the a create_job() request.
-      start_time: If specified, only messages generated after the start time
-        will be returned, otherwise all messages since job started will be
-        returned. The value is a string representing UTC time
-        (e.g., '2015-08-18T21:03:50.644Z')
-      end_time: If specified, only messages generated before the end time
-        will be returned, otherwise all messages up to current time will be
-        returned. The value is a string representing UTC time
-        (e.g., '2015-08-18T21:03:50.644Z')
-      page_token: A string to be used as next page token if the list call
-        returned paginated results.
-      minimum_importance: Filter for messages based on importance. The possible
-        string values in increasing order of importance are: JOB_MESSAGE_DEBUG,
-        JOB_MESSAGE_DETAILED, JOB_MESSAGE_BASIC, JOB_MESSAGE_WARNING,
-        JOB_MESSAGE_ERROR. For example, a filter set on warning will allow only
-        warnings and errors and exclude all others.
-
-    Returns:
-      A tuple consisting of a list of JobMessage instances and a
-      next page token string.
-
-    Raises:
-      RuntimeError: if an unexpected value for the message_importance argument
-        is used.
-
-    The JobMessage objects returned by the call contain the following  fields:
-      id: A unique string identifier for the message.
-      time: A string representing the UTC time of the message
-        (e.g., '2015-08-18T21:03:50.644Z')
-      messageImportance: An enumeration value for the message importance. The
-        value if converted to string will have the following possible values:
-        JOB_MESSAGE_DEBUG, JOB_MESSAGE_DETAILED, JOB_MESSAGE_BASIC,
-        JOB_MESSAGE_WARNING, JOB_MESSAGE_ERROR.
-     messageText: A message string.
-    """
-    request = dataflow.DataflowProjectsJobsMessagesListRequest(
-        jobId=job_id, projectId=self.google_cloud_options.project)
-    if page_token is not None:
-      request.pageToken = page_token
-    if start_time is not None:
-      request.startTime = start_time
-    if end_time is not None:
-      request.endTime = end_time
-    if minimum_importance is not None:
-      if minimum_importance == 'JOB_MESSAGE_DEBUG':
-        request.minimumImportance = (
-            dataflow.DataflowProjectsJobsMessagesListRequest
-            .MinimumImportanceValueValuesEnum
-            .JOB_MESSAGE_DEBUG)
-      elif minimum_importance == 'JOB_MESSAGE_DETAILED':
-        request.minimumImportance = (
-            dataflow.DataflowProjectsJobsMessagesListRequest
-            .MinimumImportanceValueValuesEnum
-            .JOB_MESSAGE_DETAILED)
-      elif minimum_importance == 'JOB_MESSAGE_BASIC':
-        request.minimumImportance = (
-            dataflow.DataflowProjectsJobsMessagesListRequest
-            .MinimumImportanceValueValuesEnum
-            .JOB_MESSAGE_BASIC)
-      elif minimum_importance == 'JOB_MESSAGE_WARNING':
-        request.minimumImportance = (
-            dataflow.DataflowProjectsJobsMessagesListRequest
-            .MinimumImportanceValueValuesEnum
-            .JOB_MESSAGE_WARNING)
-      elif minimum_importance == 'JOB_MESSAGE_ERROR':
-        request.minimumImportance = (
-            dataflow.DataflowProjectsJobsMessagesListRequest
-            .MinimumImportanceValueValuesEnum
-            .JOB_MESSAGE_ERROR)
-      else:
-        raise RuntimeError(
-            'Unexpected value for minimum_importance argument: %r',
-            minimum_importance)
-    response = self._client.projects_jobs_messages.List(request)
-    return response.jobMessages, response.nextPageToken
-
-
-class MetricUpdateTranslators(object):
-  """Translators between accumulators and dataflow metric updates."""
-
-  @staticmethod
-  def translate_boolean(accumulator, metric_update_proto):
-    metric_update_proto.boolean = accumulator.value
-
-  @staticmethod
-  def translate_scalar_mean_int(accumulator, metric_update_proto):
-    if accumulator.count:
-      metric_update_proto.integerMean = dataflow.IntegerMean()
-      metric_update_proto.integerMean.sum = to_split_int(accumulator.sum)
-      metric_update_proto.integerMean.count = to_split_int(accumulator.count)
-    else:
-      metric_update_proto.nameAndKind.kind = None
-
-  @staticmethod
-  def translate_scalar_mean_float(accumulator, metric_update_proto):
-    if accumulator.count:
-      metric_update_proto.floatingPointMean = dataflow.FloatingPointMean()
-      metric_update_proto.floatingPointMean.sum = accumulator.sum
-      metric_update_proto.floatingPointMean.count = to_split_int(
-          accumulator.count)
-    else:
-      metric_update_proto.nameAndKind.kind = None
-
-  @staticmethod
-  def translate_scalar_counter_int(accumulator, metric_update_proto):
-    metric_update_proto.integer = to_split_int(accumulator.value)
-
-  @staticmethod
-  def translate_scalar_counter_float(accumulator, metric_update_proto):
-    metric_update_proto.floatingPoint = accumulator.value
-
-
-def to_split_int(n):
-  res = dataflow.SplitInt64()
-  res.lowBits = n & 0xffffffff
-  res.highBits = n >> 32
-  return res
-
-
-def translate_distribution(distribution_update, metric_update_proto):
-  """Translate metrics DistributionUpdate to dataflow distribution update."""
-  dist_update_proto = dataflow.DistributionUpdate()
-  dist_update_proto.min = to_split_int(distribution_update.min)
-  dist_update_proto.max = to_split_int(distribution_update.max)
-  dist_update_proto.count = to_split_int(distribution_update.count)
-  dist_update_proto.sum = to_split_int(distribution_update.sum)
-  metric_update_proto.distribution = dist_update_proto
-
-
-def translate_value(value, metric_update_proto):
-  metric_update_proto.integer = to_split_int(value)
-
-
-def translate_scalar(accumulator, metric_update):
-  metric_update.scalar = to_json_value(accumulator.value, with_type=True)
-
-
-def translate_mean(accumulator, metric_update):
-  if accumulator.count:
-    metric_update.meanSum = to_json_value(accumulator.sum, with_type=True)
-    metric_update.meanCount = to_json_value(accumulator.count, with_type=True)
-  else:
-    # A denominator of 0 will raise an error in the service.
-    # What it means is we have nothing to report yet, so don't.
-    metric_update.kind = None
-
-
-# To enable a counter on the service, add it to this dictionary.
-metric_translations = {
-    cy_combiners.CountCombineFn: ('sum', translate_scalar),
-    cy_combiners.SumInt64Fn: ('sum', translate_scalar),
-    cy_combiners.MinInt64Fn: ('min', translate_scalar),
-    cy_combiners.MaxInt64Fn: ('max', translate_scalar),
-    cy_combiners.MeanInt64Fn: ('mean', translate_mean),
-    cy_combiners.SumFloatFn: ('sum', translate_scalar),
-    cy_combiners.MinFloatFn: ('min', translate_scalar),
-    cy_combiners.MaxFloatFn: ('max', translate_scalar),
-    cy_combiners.MeanFloatFn: ('mean', translate_mean),
-    cy_combiners.AllCombineFn: ('and', translate_scalar),
-    cy_combiners.AnyCombineFn: ('or', translate_scalar),
-}
-
-counter_translations = {
-    cy_combiners.CountCombineFn: (
-        dataflow.NameAndKind.KindValueValuesEnum.SUM,
-        MetricUpdateTranslators.translate_scalar_counter_int),
-    cy_combiners.SumInt64Fn: (
-        dataflow.NameAndKind.KindValueValuesEnum.SUM,
-        MetricUpdateTranslators.translate_scalar_counter_int),
-    cy_combiners.MinInt64Fn: (
-        dataflow.NameAndKind.KindValueValuesEnum.MIN,
-        MetricUpdateTranslators.translate_scalar_counter_int),
-    cy_combiners.MaxInt64Fn: (
-        dataflow.NameAndKind.KindValueValuesEnum.MAX,
-        MetricUpdateTranslators.translate_scalar_counter_int),
-    cy_combiners.MeanInt64Fn: (
-        dataflow.NameAndKind.KindValueValuesEnum.MEAN,
-        MetricUpdateTranslators.translate_scalar_mean_int),
-    cy_combiners.SumFloatFn: (
-        dataflow.NameAndKind.KindValueValuesEnum.SUM,
-        MetricUpdateTranslators.translate_scalar_counter_float),
-    cy_combiners.MinFloatFn: (
-        dataflow.NameAndKind.KindValueValuesEnum.MIN,
-        MetricUpdateTranslators.translate_scalar_counter_float),
-    cy_combiners.MaxFloatFn: (
-        dataflow.NameAndKind.KindValueValuesEnum.MAX,
-        MetricUpdateTranslators.translate_scalar_counter_float),
-    cy_combiners.MeanFloatFn: (
-        dataflow.NameAndKind.KindValueValuesEnum.MEAN,
-        MetricUpdateTranslators.translate_scalar_mean_float),
-    cy_combiners.AllCombineFn: (
-        dataflow.NameAndKind.KindValueValuesEnum.AND,
-        MetricUpdateTranslators.translate_boolean),
-    cy_combiners.AnyCombineFn: (
-        dataflow.NameAndKind.KindValueValuesEnum.OR,
-        MetricUpdateTranslators.translate_boolean),
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/59ad58ac/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/apiclient_test.py
----------------------------------------------------------------------
diff --git 
a/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/apiclient_test.py
 
b/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/apiclient_test.py
deleted file mode 100644
index 2c53e37..0000000
--- 
a/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/apiclient_test.py
+++ /dev/null
@@ -1,96 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-"""Unit tests for the apiclient module."""
-
-import unittest
-
-from mock import Mock
-
-from apache_beam.metrics.cells import DistributionData
-from apache_beam.utils.pipeline_options import PipelineOptions
-
-from apache_beam.runners.google_cloud_dataflow.dataflow_runner import 
DataflowRunner
-from apache_beam.runners.google_cloud_dataflow.internal.clients import dataflow
-
-# Protect against environments where apitools library is not available.
-# pylint: disable=wrong-import-order, wrong-import-position
-try:
-  from apache_beam.runners.google_cloud_dataflow.internal import apiclient
-except ImportError:
-  apiclient = None
-# pylint: enable=wrong-import-order, wrong-import-position
-
-
-@unittest.skipIf(apiclient is None, 'GCP dependencies are not installed')
-class UtilTest(unittest.TestCase):
-
-  @unittest.skip("Enable once BEAM-1080 is fixed.")
-  def test_create_application_client(self):
-    pipeline_options = PipelineOptions()
-    apiclient.DataflowApplicationClient(
-        pipeline_options,
-        DataflowRunner.BATCH_ENVIRONMENT_MAJOR_VERSION)
-
-  def test_default_job_name(self):
-    job_name = apiclient.Job.default_job_name(None)
-    regexp = 'beamapp-.*-[0-9]{10}-[0-9]{6}'
-    self.assertRegexpMatches(job_name, regexp)
-
-  def test_split_int(self):
-    number = 12345
-    split_number = apiclient.to_split_int(number)
-    self.assertEqual((split_number.lowBits, split_number.highBits),
-                     (number, 0))
-    shift_number = number << 32
-    split_number = apiclient.to_split_int(shift_number)
-    self.assertEqual((split_number.lowBits, split_number.highBits),
-                     (0, number))
-
-  def test_translate_distribution(self):
-    metric_update = dataflow.CounterUpdate()
-    distribution_update = DistributionData(16, 2, 1, 15)
-    apiclient.translate_distribution(distribution_update, metric_update)
-    self.assertEqual(metric_update.distribution.min.lowBits,
-                     distribution_update.min)
-    self.assertEqual(metric_update.distribution.max.lowBits,
-                     distribution_update.max)
-    self.assertEqual(metric_update.distribution.sum.lowBits,
-                     distribution_update.sum)
-    self.assertEqual(metric_update.distribution.count.lowBits,
-                     distribution_update.count)
-
-  def test_translate_means(self):
-    metric_update = dataflow.CounterUpdate()
-    accumulator = Mock()
-    accumulator.sum = 16
-    accumulator.count = 2
-    apiclient.MetricUpdateTranslators.translate_scalar_mean_int(accumulator,
-                                                                metric_update)
-    self.assertEqual(metric_update.integerMean.sum.lowBits, accumulator.sum)
-    self.assertEqual(metric_update.integerMean.count.lowBits, 
accumulator.count)
-
-    accumulator.sum = 16.0
-    accumulator.count = 2
-    apiclient.MetricUpdateTranslators.translate_scalar_mean_float(accumulator,
-                                                                  
metric_update)
-    self.assertEqual(metric_update.floatingPointMean.sum, accumulator.sum)
-    self.assertEqual(
-        metric_update.floatingPointMean.count.lowBits, accumulator.count)
-
-
-if __name__ == '__main__':
-  unittest.main()

http://git-wip-us.apache.org/repos/asf/beam/blob/59ad58ac/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/clients/__init__.py
----------------------------------------------------------------------
diff --git 
a/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/clients/__init__.py
 
b/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/clients/__init__.py
deleted file mode 100644
index cce3aca..0000000
--- 
a/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/clients/__init__.py
+++ /dev/null
@@ -1,16 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#

http://git-wip-us.apache.org/repos/asf/beam/blob/59ad58ac/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/clients/dataflow/__init__.py
----------------------------------------------------------------------
diff --git 
a/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/clients/dataflow/__init__.py
 
b/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/clients/dataflow/__init__.py
deleted file mode 100644
index 1399f21..0000000
--- 
a/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/clients/dataflow/__init__.py
+++ /dev/null
@@ -1,33 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""Common imports for generated dataflow client library."""
-# pylint:disable=wildcard-import
-
-import pkgutil
-
-# Protect against environments where apitools library is not available.
-# pylint: disable=wrong-import-order, wrong-import-position
-try:
-  from apitools.base.py import *
-  from 
apache_beam.runners.google_cloud_dataflow.internal.clients.dataflow.dataflow_v1b3_messages
 import *
-  from 
apache_beam.runners.google_cloud_dataflow.internal.clients.dataflow.dataflow_v1b3_client
 import *
-except ImportError:
-  pass
-# pylint: enable=wrong-import-order, wrong-import-position
-
-__path__ = pkgutil.extend_path(__path__, __name__)

http://git-wip-us.apache.org/repos/asf/beam/blob/59ad58ac/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/clients/dataflow/dataflow_v1b3_client.py
----------------------------------------------------------------------
diff --git 
a/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/clients/dataflow/dataflow_v1b3_client.py
 
b/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/clients/dataflow/dataflow_v1b3_client.py
deleted file mode 100644
index 4d3d525..0000000
--- 
a/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/clients/dataflow/dataflow_v1b3_client.py
+++ /dev/null
@@ -1,684 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""Generated client library for dataflow version v1b3."""
-# NOTE: This file is autogenerated and should not be edited by hand.
-from apitools.base.py import base_api
-
-from apache_beam.runners.google_cloud_dataflow.internal.clients.dataflow 
import dataflow_v1b3_messages as messages
-
-
-class DataflowV1b3(base_api.BaseApiClient):
-  """Generated client library for service dataflow version v1b3."""
-
-  MESSAGES_MODULE = messages
-  BASE_URL = u'https://dataflow.googleapis.com/'
-
-  _PACKAGE = u'dataflow'
-  _SCOPES = [u'https://www.googleapis.com/auth/cloud-platform', 
u'https://www.googleapis.com/auth/userinfo.email']
-  _VERSION = u'v1b3'
-  _CLIENT_ID = '1042881264118.apps.googleusercontent.com'
-  _CLIENT_SECRET = 'x_Tw5K8nnjoRAqULM9PFAC2b'
-  _USER_AGENT = 'x_Tw5K8nnjoRAqULM9PFAC2b'
-  _CLIENT_CLASS_NAME = u'DataflowV1b3'
-  _URL_VERSION = u'v1b3'
-  _API_KEY = None
-
-  def __init__(self, url='', credentials=None,
-               get_credentials=True, http=None, model=None,
-               log_request=False, log_response=False,
-               credentials_args=None, default_global_params=None,
-               additional_http_headers=None):
-    """Create a new dataflow handle."""
-    url = url or self.BASE_URL
-    super(DataflowV1b3, self).__init__(
-        url, credentials=credentials,
-        get_credentials=get_credentials, http=http, model=model,
-        log_request=log_request, log_response=log_response,
-        credentials_args=credentials_args,
-        default_global_params=default_global_params,
-        additional_http_headers=additional_http_headers)
-    self.projects_jobs_debug = self.ProjectsJobsDebugService(self)
-    self.projects_jobs_messages = self.ProjectsJobsMessagesService(self)
-    self.projects_jobs_workItems = self.ProjectsJobsWorkItemsService(self)
-    self.projects_jobs = self.ProjectsJobsService(self)
-    self.projects_locations_jobs_messages = 
self.ProjectsLocationsJobsMessagesService(self)
-    self.projects_locations_jobs_workItems = 
self.ProjectsLocationsJobsWorkItemsService(self)
-    self.projects_locations_jobs = self.ProjectsLocationsJobsService(self)
-    self.projects_locations = self.ProjectsLocationsService(self)
-    self.projects_templates = self.ProjectsTemplatesService(self)
-    self.projects = self.ProjectsService(self)
-
-  class ProjectsJobsDebugService(base_api.BaseApiService):
-    """Service class for the projects_jobs_debug resource."""
-
-    _NAME = u'projects_jobs_debug'
-
-    def __init__(self, client):
-      super(DataflowV1b3.ProjectsJobsDebugService, self).__init__(client)
-      self._upload_configs = {
-          }
-
-    def GetConfig(self, request, global_params=None):
-      """Get encoded debug configuration for component. Not cacheable.
-
-      Args:
-        request: (DataflowProjectsJobsDebugGetConfigRequest) input message
-        global_params: (StandardQueryParameters, default: None) global 
arguments
-      Returns:
-        (GetDebugConfigResponse) The response message.
-      """
-      config = self.GetMethodConfig('GetConfig')
-      return self._RunMethod(
-          config, request, global_params=global_params)
-
-    GetConfig.method_config = lambda: base_api.ApiMethodInfo(
-        http_method=u'POST',
-        method_id=u'dataflow.projects.jobs.debug.getConfig',
-        ordered_params=[u'projectId', u'jobId'],
-        path_params=[u'jobId', u'projectId'],
-        query_params=[],
-        
relative_path=u'v1b3/projects/{projectId}/jobs/{jobId}/debug/getConfig',
-        request_field=u'getDebugConfigRequest',
-        request_type_name=u'DataflowProjectsJobsDebugGetConfigRequest',
-        response_type_name=u'GetDebugConfigResponse',
-        supports_download=False,
-    )
-
-    def SendCapture(self, request, global_params=None):
-      """Send encoded debug capture data for component.
-
-      Args:
-        request: (DataflowProjectsJobsDebugSendCaptureRequest) input message
-        global_params: (StandardQueryParameters, default: None) global 
arguments
-      Returns:
-        (SendDebugCaptureResponse) The response message.
-      """
-      config = self.GetMethodConfig('SendCapture')
-      return self._RunMethod(
-          config, request, global_params=global_params)
-
-    SendCapture.method_config = lambda: base_api.ApiMethodInfo(
-        http_method=u'POST',
-        method_id=u'dataflow.projects.jobs.debug.sendCapture',
-        ordered_params=[u'projectId', u'jobId'],
-        path_params=[u'jobId', u'projectId'],
-        query_params=[],
-        
relative_path=u'v1b3/projects/{projectId}/jobs/{jobId}/debug/sendCapture',
-        request_field=u'sendDebugCaptureRequest',
-        request_type_name=u'DataflowProjectsJobsDebugSendCaptureRequest',
-        response_type_name=u'SendDebugCaptureResponse',
-        supports_download=False,
-    )
-
-  class ProjectsJobsMessagesService(base_api.BaseApiService):
-    """Service class for the projects_jobs_messages resource."""
-
-    _NAME = u'projects_jobs_messages'
-
-    def __init__(self, client):
-      super(DataflowV1b3.ProjectsJobsMessagesService, self).__init__(client)
-      self._upload_configs = {
-          }
-
-    def List(self, request, global_params=None):
-      """Request the job status.
-
-      Args:
-        request: (DataflowProjectsJobsMessagesListRequest) input message
-        global_params: (StandardQueryParameters, default: None) global 
arguments
-      Returns:
-        (ListJobMessagesResponse) The response message.
-      """
-      config = self.GetMethodConfig('List')
-      return self._RunMethod(
-          config, request, global_params=global_params)
-
-    List.method_config = lambda: base_api.ApiMethodInfo(
-        http_method=u'GET',
-        method_id=u'dataflow.projects.jobs.messages.list',
-        ordered_params=[u'projectId', u'jobId'],
-        path_params=[u'jobId', u'projectId'],
-        query_params=[u'endTime', u'location', u'minimumImportance', 
u'pageSize', u'pageToken', u'startTime'],
-        relative_path=u'v1b3/projects/{projectId}/jobs/{jobId}/messages',
-        request_field='',
-        request_type_name=u'DataflowProjectsJobsMessagesListRequest',
-        response_type_name=u'ListJobMessagesResponse',
-        supports_download=False,
-    )
-
-  class ProjectsJobsWorkItemsService(base_api.BaseApiService):
-    """Service class for the projects_jobs_workItems resource."""
-
-    _NAME = u'projects_jobs_workItems'
-
-    def __init__(self, client):
-      super(DataflowV1b3.ProjectsJobsWorkItemsService, self).__init__(client)
-      self._upload_configs = {
-          }
-
-    def Lease(self, request, global_params=None):
-      """Leases a dataflow WorkItem to run.
-
-      Args:
-        request: (DataflowProjectsJobsWorkItemsLeaseRequest) input message
-        global_params: (StandardQueryParameters, default: None) global 
arguments
-      Returns:
-        (LeaseWorkItemResponse) The response message.
-      """
-      config = self.GetMethodConfig('Lease')
-      return self._RunMethod(
-          config, request, global_params=global_params)
-
-    Lease.method_config = lambda: base_api.ApiMethodInfo(
-        http_method=u'POST',
-        method_id=u'dataflow.projects.jobs.workItems.lease',
-        ordered_params=[u'projectId', u'jobId'],
-        path_params=[u'jobId', u'projectId'],
-        query_params=[],
-        
relative_path=u'v1b3/projects/{projectId}/jobs/{jobId}/workItems:lease',
-        request_field=u'leaseWorkItemRequest',
-        request_type_name=u'DataflowProjectsJobsWorkItemsLeaseRequest',
-        response_type_name=u'LeaseWorkItemResponse',
-        supports_download=False,
-    )
-
-    def ReportStatus(self, request, global_params=None):
-      """Reports the status of dataflow WorkItems leased by a worker.
-
-      Args:
-        request: (DataflowProjectsJobsWorkItemsReportStatusRequest) input 
message
-        global_params: (StandardQueryParameters, default: None) global 
arguments
-      Returns:
-        (ReportWorkItemStatusResponse) The response message.
-      """
-      config = self.GetMethodConfig('ReportStatus')
-      return self._RunMethod(
-          config, request, global_params=global_params)
-
-    ReportStatus.method_config = lambda: base_api.ApiMethodInfo(
-        http_method=u'POST',
-        method_id=u'dataflow.projects.jobs.workItems.reportStatus',
-        ordered_params=[u'projectId', u'jobId'],
-        path_params=[u'jobId', u'projectId'],
-        query_params=[],
-        
relative_path=u'v1b3/projects/{projectId}/jobs/{jobId}/workItems:reportStatus',
-        request_field=u'reportWorkItemStatusRequest',
-        request_type_name=u'DataflowProjectsJobsWorkItemsReportStatusRequest',
-        response_type_name=u'ReportWorkItemStatusResponse',
-        supports_download=False,
-    )
-
-  class ProjectsJobsService(base_api.BaseApiService):
-    """Service class for the projects_jobs resource."""
-
-    _NAME = u'projects_jobs'
-
-    def __init__(self, client):
-      super(DataflowV1b3.ProjectsJobsService, self).__init__(client)
-      self._upload_configs = {
-          }
-
-    def Create(self, request, global_params=None):
-      """Creates a Cloud Dataflow job.
-
-      Args:
-        request: (DataflowProjectsJobsCreateRequest) input message
-        global_params: (StandardQueryParameters, default: None) global 
arguments
-      Returns:
-        (Job) The response message.
-      """
-      config = self.GetMethodConfig('Create')
-      return self._RunMethod(
-          config, request, global_params=global_params)
-
-    Create.method_config = lambda: base_api.ApiMethodInfo(
-        http_method=u'POST',
-        method_id=u'dataflow.projects.jobs.create',
-        ordered_params=[u'projectId'],
-        path_params=[u'projectId'],
-        query_params=[u'location', u'replaceJobId', u'view'],
-        relative_path=u'v1b3/projects/{projectId}/jobs',
-        request_field=u'job',
-        request_type_name=u'DataflowProjectsJobsCreateRequest',
-        response_type_name=u'Job',
-        supports_download=False,
-    )
-
-    def Get(self, request, global_params=None):
-      """Gets the state of the specified Cloud Dataflow job.
-
-      Args:
-        request: (DataflowProjectsJobsGetRequest) input message
-        global_params: (StandardQueryParameters, default: None) global 
arguments
-      Returns:
-        (Job) The response message.
-      """
-      config = self.GetMethodConfig('Get')
-      return self._RunMethod(
-          config, request, global_params=global_params)
-
-    Get.method_config = lambda: base_api.ApiMethodInfo(
-        http_method=u'GET',
-        method_id=u'dataflow.projects.jobs.get',
-        ordered_params=[u'projectId', u'jobId'],
-        path_params=[u'jobId', u'projectId'],
-        query_params=[u'location', u'view'],
-        relative_path=u'v1b3/projects/{projectId}/jobs/{jobId}',
-        request_field='',
-        request_type_name=u'DataflowProjectsJobsGetRequest',
-        response_type_name=u'Job',
-        supports_download=False,
-    )
-
-    def GetMetrics(self, request, global_params=None):
-      """Request the job status.
-
-      Args:
-        request: (DataflowProjectsJobsGetMetricsRequest) input message
-        global_params: (StandardQueryParameters, default: None) global 
arguments
-      Returns:
-        (JobMetrics) The response message.
-      """
-      config = self.GetMethodConfig('GetMetrics')
-      return self._RunMethod(
-          config, request, global_params=global_params)
-
-    GetMetrics.method_config = lambda: base_api.ApiMethodInfo(
-        http_method=u'GET',
-        method_id=u'dataflow.projects.jobs.getMetrics',
-        ordered_params=[u'projectId', u'jobId'],
-        path_params=[u'jobId', u'projectId'],
-        query_params=[u'location', u'startTime'],
-        relative_path=u'v1b3/projects/{projectId}/jobs/{jobId}/metrics',
-        request_field='',
-        request_type_name=u'DataflowProjectsJobsGetMetricsRequest',
-        response_type_name=u'JobMetrics',
-        supports_download=False,
-    )
-
-    def List(self, request, global_params=None):
-      """List the jobs of a project.
-
-      Args:
-        request: (DataflowProjectsJobsListRequest) input message
-        global_params: (StandardQueryParameters, default: None) global 
arguments
-      Returns:
-        (ListJobsResponse) The response message.
-      """
-      config = self.GetMethodConfig('List')
-      return self._RunMethod(
-          config, request, global_params=global_params)
-
-    List.method_config = lambda: base_api.ApiMethodInfo(
-        http_method=u'GET',
-        method_id=u'dataflow.projects.jobs.list',
-        ordered_params=[u'projectId'],
-        path_params=[u'projectId'],
-        query_params=[u'filter', u'location', u'pageSize', u'pageToken', 
u'view'],
-        relative_path=u'v1b3/projects/{projectId}/jobs',
-        request_field='',
-        request_type_name=u'DataflowProjectsJobsListRequest',
-        response_type_name=u'ListJobsResponse',
-        supports_download=False,
-    )
-
-    def Update(self, request, global_params=None):
-      """Updates the state of an existing Cloud Dataflow job.
-
-      Args:
-        request: (DataflowProjectsJobsUpdateRequest) input message
-        global_params: (StandardQueryParameters, default: None) global 
arguments
-      Returns:
-        (Job) The response message.
-      """
-      config = self.GetMethodConfig('Update')
-      return self._RunMethod(
-          config, request, global_params=global_params)
-
-    Update.method_config = lambda: base_api.ApiMethodInfo(
-        http_method=u'PUT',
-        method_id=u'dataflow.projects.jobs.update',
-        ordered_params=[u'projectId', u'jobId'],
-        path_params=[u'jobId', u'projectId'],
-        query_params=[u'location'],
-        relative_path=u'v1b3/projects/{projectId}/jobs/{jobId}',
-        request_field=u'job',
-        request_type_name=u'DataflowProjectsJobsUpdateRequest',
-        response_type_name=u'Job',
-        supports_download=False,
-    )
-
-  class ProjectsLocationsJobsMessagesService(base_api.BaseApiService):
-    """Service class for the projects_locations_jobs_messages resource."""
-
-    _NAME = u'projects_locations_jobs_messages'
-
-    def __init__(self, client):
-      super(DataflowV1b3.ProjectsLocationsJobsMessagesService, 
self).__init__(client)
-      self._upload_configs = {
-          }
-
-    def List(self, request, global_params=None):
-      """Request the job status.
-
-      Args:
-        request: (DataflowProjectsLocationsJobsMessagesListRequest) input 
message
-        global_params: (StandardQueryParameters, default: None) global 
arguments
-      Returns:
-        (ListJobMessagesResponse) The response message.
-      """
-      config = self.GetMethodConfig('List')
-      return self._RunMethod(
-          config, request, global_params=global_params)
-
-    List.method_config = lambda: base_api.ApiMethodInfo(
-        http_method=u'GET',
-        method_id=u'dataflow.projects.locations.jobs.messages.list',
-        ordered_params=[u'projectId', u'location', u'jobId'],
-        path_params=[u'jobId', u'location', u'projectId'],
-        query_params=[u'endTime', u'minimumImportance', u'pageSize', 
u'pageToken', u'startTime'],
-        
relative_path=u'v1b3/projects/{projectId}/locations/{location}/jobs/{jobId}/messages',
-        request_field='',
-        request_type_name=u'DataflowProjectsLocationsJobsMessagesListRequest',
-        response_type_name=u'ListJobMessagesResponse',
-        supports_download=False,
-    )
-
-  class ProjectsLocationsJobsWorkItemsService(base_api.BaseApiService):
-    """Service class for the projects_locations_jobs_workItems resource."""
-
-    _NAME = u'projects_locations_jobs_workItems'
-
-    def __init__(self, client):
-      super(DataflowV1b3.ProjectsLocationsJobsWorkItemsService, 
self).__init__(client)
-      self._upload_configs = {
-          }
-
-    def Lease(self, request, global_params=None):
-      """Leases a dataflow WorkItem to run.
-
-      Args:
-        request: (DataflowProjectsLocationsJobsWorkItemsLeaseRequest) input 
message
-        global_params: (StandardQueryParameters, default: None) global 
arguments
-      Returns:
-        (LeaseWorkItemResponse) The response message.
-      """
-      config = self.GetMethodConfig('Lease')
-      return self._RunMethod(
-          config, request, global_params=global_params)
-
-    Lease.method_config = lambda: base_api.ApiMethodInfo(
-        http_method=u'POST',
-        method_id=u'dataflow.projects.locations.jobs.workItems.lease',
-        ordered_params=[u'projectId', u'location', u'jobId'],
-        path_params=[u'jobId', u'location', u'projectId'],
-        query_params=[],
-        
relative_path=u'v1b3/projects/{projectId}/locations/{location}/jobs/{jobId}/workItems:lease',
-        request_field=u'leaseWorkItemRequest',
-        
request_type_name=u'DataflowProjectsLocationsJobsWorkItemsLeaseRequest',
-        response_type_name=u'LeaseWorkItemResponse',
-        supports_download=False,
-    )
-
-    def ReportStatus(self, request, global_params=None):
-      """Reports the status of dataflow WorkItems leased by a worker.
-
-      Args:
-        request: (DataflowProjectsLocationsJobsWorkItemsReportStatusRequest) 
input message
-        global_params: (StandardQueryParameters, default: None) global 
arguments
-      Returns:
-        (ReportWorkItemStatusResponse) The response message.
-      """
-      config = self.GetMethodConfig('ReportStatus')
-      return self._RunMethod(
-          config, request, global_params=global_params)
-
-    ReportStatus.method_config = lambda: base_api.ApiMethodInfo(
-        http_method=u'POST',
-        method_id=u'dataflow.projects.locations.jobs.workItems.reportStatus',
-        ordered_params=[u'projectId', u'location', u'jobId'],
-        path_params=[u'jobId', u'location', u'projectId'],
-        query_params=[],
-        
relative_path=u'v1b3/projects/{projectId}/locations/{location}/jobs/{jobId}/workItems:reportStatus',
-        request_field=u'reportWorkItemStatusRequest',
-        
request_type_name=u'DataflowProjectsLocationsJobsWorkItemsReportStatusRequest',
-        response_type_name=u'ReportWorkItemStatusResponse',
-        supports_download=False,
-    )
-
-  class ProjectsLocationsJobsService(base_api.BaseApiService):
-    """Service class for the projects_locations_jobs resource."""
-
-    _NAME = u'projects_locations_jobs'
-
-    def __init__(self, client):
-      super(DataflowV1b3.ProjectsLocationsJobsService, self).__init__(client)
-      self._upload_configs = {
-          }
-
-    def Create(self, request, global_params=None):
-      """Creates a Cloud Dataflow job.
-
-      Args:
-        request: (DataflowProjectsLocationsJobsCreateRequest) input message
-        global_params: (StandardQueryParameters, default: None) global 
arguments
-      Returns:
-        (Job) The response message.
-      """
-      config = self.GetMethodConfig('Create')
-      return self._RunMethod(
-          config, request, global_params=global_params)
-
-    Create.method_config = lambda: base_api.ApiMethodInfo(
-        http_method=u'POST',
-        method_id=u'dataflow.projects.locations.jobs.create',
-        ordered_params=[u'projectId', u'location'],
-        path_params=[u'location', u'projectId'],
-        query_params=[u'replaceJobId', u'view'],
-        relative_path=u'v1b3/projects/{projectId}/locations/{location}/jobs',
-        request_field=u'job',
-        request_type_name=u'DataflowProjectsLocationsJobsCreateRequest',
-        response_type_name=u'Job',
-        supports_download=False,
-    )
-
-    def Get(self, request, global_params=None):
-      """Gets the state of the specified Cloud Dataflow job.
-
-      Args:
-        request: (DataflowProjectsLocationsJobsGetRequest) input message
-        global_params: (StandardQueryParameters, default: None) global 
arguments
-      Returns:
-        (Job) The response message.
-      """
-      config = self.GetMethodConfig('Get')
-      return self._RunMethod(
-          config, request, global_params=global_params)
-
-    Get.method_config = lambda: base_api.ApiMethodInfo(
-        http_method=u'GET',
-        method_id=u'dataflow.projects.locations.jobs.get',
-        ordered_params=[u'projectId', u'location', u'jobId'],
-        path_params=[u'jobId', u'location', u'projectId'],
-        query_params=[u'view'],
-        
relative_path=u'v1b3/projects/{projectId}/locations/{location}/jobs/{jobId}',
-        request_field='',
-        request_type_name=u'DataflowProjectsLocationsJobsGetRequest',
-        response_type_name=u'Job',
-        supports_download=False,
-    )
-
-    def GetMetrics(self, request, global_params=None):
-      """Request the job status.
-
-      Args:
-        request: (DataflowProjectsLocationsJobsGetMetricsRequest) input message
-        global_params: (StandardQueryParameters, default: None) global 
arguments
-      Returns:
-        (JobMetrics) The response message.
-      """
-      config = self.GetMethodConfig('GetMetrics')
-      return self._RunMethod(
-          config, request, global_params=global_params)
-
-    GetMetrics.method_config = lambda: base_api.ApiMethodInfo(
-        http_method=u'GET',
-        method_id=u'dataflow.projects.locations.jobs.getMetrics',
-        ordered_params=[u'projectId', u'location', u'jobId'],
-        path_params=[u'jobId', u'location', u'projectId'],
-        query_params=[u'startTime'],
-        
relative_path=u'v1b3/projects/{projectId}/locations/{location}/jobs/{jobId}/metrics',
-        request_field='',
-        request_type_name=u'DataflowProjectsLocationsJobsGetMetricsRequest',
-        response_type_name=u'JobMetrics',
-        supports_download=False,
-    )
-
-    def List(self, request, global_params=None):
-      """List the jobs of a project.
-
-      Args:
-        request: (DataflowProjectsLocationsJobsListRequest) input message
-        global_params: (StandardQueryParameters, default: None) global 
arguments
-      Returns:
-        (ListJobsResponse) The response message.
-      """
-      config = self.GetMethodConfig('List')
-      return self._RunMethod(
-          config, request, global_params=global_params)
-
-    List.method_config = lambda: base_api.ApiMethodInfo(
-        http_method=u'GET',
-        method_id=u'dataflow.projects.locations.jobs.list',
-        ordered_params=[u'projectId', u'location'],
-        path_params=[u'location', u'projectId'],
-        query_params=[u'filter', u'pageSize', u'pageToken', u'view'],
-        relative_path=u'v1b3/projects/{projectId}/locations/{location}/jobs',
-        request_field='',
-        request_type_name=u'DataflowProjectsLocationsJobsListRequest',
-        response_type_name=u'ListJobsResponse',
-        supports_download=False,
-    )
-
-    def Update(self, request, global_params=None):
-      """Updates the state of an existing Cloud Dataflow job.
-
-      Args:
-        request: (DataflowProjectsLocationsJobsUpdateRequest) input message
-        global_params: (StandardQueryParameters, default: None) global 
arguments
-      Returns:
-        (Job) The response message.
-      """
-      config = self.GetMethodConfig('Update')
-      return self._RunMethod(
-          config, request, global_params=global_params)
-
-    Update.method_config = lambda: base_api.ApiMethodInfo(
-        http_method=u'PUT',
-        method_id=u'dataflow.projects.locations.jobs.update',
-        ordered_params=[u'projectId', u'location', u'jobId'],
-        path_params=[u'jobId', u'location', u'projectId'],
-        query_params=[],
-        
relative_path=u'v1b3/projects/{projectId}/locations/{location}/jobs/{jobId}',
-        request_field=u'job',
-        request_type_name=u'DataflowProjectsLocationsJobsUpdateRequest',
-        response_type_name=u'Job',
-        supports_download=False,
-    )
-
-  class ProjectsLocationsService(base_api.BaseApiService):
-    """Service class for the projects_locations resource."""
-
-    _NAME = u'projects_locations'
-
-    def __init__(self, client):
-      super(DataflowV1b3.ProjectsLocationsService, self).__init__(client)
-      self._upload_configs = {
-          }
-
-  class ProjectsTemplatesService(base_api.BaseApiService):
-    """Service class for the projects_templates resource."""
-
-    _NAME = u'projects_templates'
-
-    def __init__(self, client):
-      super(DataflowV1b3.ProjectsTemplatesService, self).__init__(client)
-      self._upload_configs = {
-          }
-
-    def Create(self, request, global_params=None):
-      """Creates a Cloud Dataflow job from a template.
-
-      Args:
-        request: (DataflowProjectsTemplatesCreateRequest) input message
-        global_params: (StandardQueryParameters, default: None) global 
arguments
-      Returns:
-        (Job) The response message.
-      """
-      config = self.GetMethodConfig('Create')
-      return self._RunMethod(
-          config, request, global_params=global_params)
-
-    Create.method_config = lambda: base_api.ApiMethodInfo(
-        http_method=u'POST',
-        method_id=u'dataflow.projects.templates.create',
-        ordered_params=[u'projectId'],
-        path_params=[u'projectId'],
-        query_params=[],
-        relative_path=u'v1b3/projects/{projectId}/templates',
-        request_field=u'createJobFromTemplateRequest',
-        request_type_name=u'DataflowProjectsTemplatesCreateRequest',
-        response_type_name=u'Job',
-        supports_download=False,
-    )
-
-  class ProjectsService(base_api.BaseApiService):
-    """Service class for the projects resource."""
-
-    _NAME = u'projects'
-
-    def __init__(self, client):
-      super(DataflowV1b3.ProjectsService, self).__init__(client)
-      self._upload_configs = {
-          }
-
-    def WorkerMessages(self, request, global_params=None):
-      """Send a worker_message to the service.
-
-      Args:
-        request: (DataflowProjectsWorkerMessagesRequest) input message
-        global_params: (StandardQueryParameters, default: None) global 
arguments
-      Returns:
-        (SendWorkerMessagesResponse) The response message.
-      """
-      config = self.GetMethodConfig('WorkerMessages')
-      return self._RunMethod(
-          config, request, global_params=global_params)
-
-    WorkerMessages.method_config = lambda: base_api.ApiMethodInfo(
-        http_method=u'POST',
-        method_id=u'dataflow.projects.workerMessages',
-        ordered_params=[u'projectId'],
-        path_params=[u'projectId'],
-        query_params=[],
-        relative_path=u'v1b3/projects/{projectId}/WorkerMessages',
-        request_field=u'sendWorkerMessagesRequest',
-        request_type_name=u'DataflowProjectsWorkerMessagesRequest',
-        response_type_name=u'SendWorkerMessagesResponse',
-        supports_download=False,
-    )

Reply via email to