http://git-wip-us.apache.org/repos/asf/beam/blob/59ad58ac/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/apiclient.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/apiclient.py deleted file mode 100644 index 98473ca..0000000 --- a/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/apiclient.py +++ /dev/null @@ -1,726 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -"""Dataflow client utility functions.""" - -import codecs -import getpass -import json -import logging -import os -import re -import time -from StringIO import StringIO -from datetime import datetime - -from apitools.base.py import encoding -from apitools.base.py import exceptions - -from apache_beam import utils -from apache_beam.internal.auth import get_service_credentials -from apache_beam.internal.google_cloud_platform.json_value import to_json_value -from apache_beam.io.google_cloud_platform.internal.clients import storage -from apache_beam.runners.google_cloud_dataflow.internal.clients import dataflow -from apache_beam.transforms import cy_combiners -from apache_beam.transforms.display import DisplayData -from apache_beam.utils import dependency -from apache_beam.utils import retry -from apache_beam.utils.dependency import get_required_container_version -from apache_beam.utils.dependency import get_sdk_name_and_version -from apache_beam.utils.names import PropertyNames -from apache_beam.utils.pipeline_options import DebugOptions -from apache_beam.utils.pipeline_options import GoogleCloudOptions -from apache_beam.utils.pipeline_options import StandardOptions -from apache_beam.utils.pipeline_options import WorkerOptions - - -class Step(object): - """Wrapper for a dataflow Step protobuf.""" - - def __init__(self, step_kind, step_name, additional_properties=None): - self.step_kind = step_kind - self.step_name = step_name - self.proto = dataflow.Step(kind=step_kind, name=step_name) - self.proto.properties = {} - self._additional_properties = [] - - if additional_properties is not None: - for (n, v, t) in additional_properties: - self.add_property(n, v, t) - - def add_property(self, name, value, with_type=False): - self._additional_properties.append((name, value, with_type)) - self.proto.properties.additionalProperties.append( - dataflow.Step.PropertiesValue.AdditionalProperty( - key=name, value=to_json_value(value, with_type=with_type))) - - def _get_outputs(self): - """Returns a list of all output labels for a step.""" - outputs = [] - for p in self.proto.properties.additionalProperties: - if p.key == PropertyNames.OUTPUT_INFO: - for entry in p.value.array_value.entries: - for entry_prop in entry.object_value.properties: - if entry_prop.key == PropertyNames.OUTPUT_NAME: - outputs.append(entry_prop.value.string_value) - return outputs - - def __reduce__(self): - """Reduce hook for pickling the Step class more easily.""" - return (Step, (self.step_kind, self.step_name, self._additional_properties)) - - def get_output(self, tag=None): - """Returns name if it is one of the outputs or first output if name is None. - - Args: - tag: tag of the output as a string or None if we want to get the - name of the first output. - - Returns: - The name of the output associated with the tag or the first output - if tag was None. - - Raises: - ValueError: if the tag does not exist within outputs. - """ - outputs = self._get_outputs() - if tag is None: - return outputs[0] - else: - name = '%s_%s' % (PropertyNames.OUT, tag) - if name not in outputs: - raise ValueError( - 'Cannot find named output: %s in %s.' % (name, outputs)) - return name - - -class Environment(object): - """Wrapper for a dataflow Environment protobuf.""" - - def __init__(self, packages, options, environment_version): - self.standard_options = options.view_as(StandardOptions) - self.google_cloud_options = options.view_as(GoogleCloudOptions) - self.worker_options = options.view_as(WorkerOptions) - self.debug_options = options.view_as(DebugOptions) - self.proto = dataflow.Environment() - self.proto.clusterManagerApiService = GoogleCloudOptions.COMPUTE_API_SERVICE - self.proto.dataset = '{}/cloud_dataflow'.format( - GoogleCloudOptions.BIGQUERY_API_SERVICE) - self.proto.tempStoragePrefix = ( - self.google_cloud_options.temp_location.replace( - 'gs:/', - GoogleCloudOptions.STORAGE_API_SERVICE)) - # User agent information. - self.proto.userAgent = dataflow.Environment.UserAgentValue() - self.local = 'localhost' in self.google_cloud_options.dataflow_endpoint - - if self.google_cloud_options.service_account_email: - self.proto.serviceAccountEmail = ( - self.google_cloud_options.service_account_email) - - sdk_name, version_string = get_sdk_name_and_version() - - self.proto.userAgent.additionalProperties.extend([ - dataflow.Environment.UserAgentValue.AdditionalProperty( - key='name', - value=to_json_value(sdk_name)), - dataflow.Environment.UserAgentValue.AdditionalProperty( - key='version', value=to_json_value(version_string))]) - # Version information. - self.proto.version = dataflow.Environment.VersionValue() - if self.standard_options.streaming: - job_type = 'PYTHON_STREAMING' - else: - job_type = 'PYTHON_BATCH' - self.proto.version.additionalProperties.extend([ - dataflow.Environment.VersionValue.AdditionalProperty( - key='job_type', - value=to_json_value(job_type)), - dataflow.Environment.VersionValue.AdditionalProperty( - key='major', value=to_json_value(environment_version))]) - # Experiments - if self.debug_options.experiments: - for experiment in self.debug_options.experiments: - self.proto.experiments.append(experiment) - # Worker pool(s) information. - package_descriptors = [] - for package in packages: - package_descriptors.append( - dataflow.Package( - location='%s/%s' % ( - self.google_cloud_options.staging_location.replace( - 'gs:/', GoogleCloudOptions.STORAGE_API_SERVICE), - package), - name=package)) - - pool = dataflow.WorkerPool( - kind='local' if self.local else 'harness', - packages=package_descriptors, - taskrunnerSettings=dataflow.TaskRunnerSettings( - parallelWorkerSettings=dataflow.WorkerSettings( - baseUrl=GoogleCloudOptions.DATAFLOW_ENDPOINT, - servicePath=self.google_cloud_options.dataflow_endpoint))) - pool.autoscalingSettings = dataflow.AutoscalingSettings() - # Set worker pool options received through command line. - if self.worker_options.num_workers: - pool.numWorkers = self.worker_options.num_workers - if self.worker_options.max_num_workers: - pool.autoscalingSettings.maxNumWorkers = ( - self.worker_options.max_num_workers) - if self.worker_options.autoscaling_algorithm: - values_enum = dataflow.AutoscalingSettings.AlgorithmValueValuesEnum - pool.autoscalingSettings.algorithm = { - 'NONE': values_enum.AUTOSCALING_ALGORITHM_NONE, - 'THROUGHPUT_BASED': values_enum.AUTOSCALING_ALGORITHM_BASIC, - }.get(self.worker_options.autoscaling_algorithm) - if self.worker_options.machine_type: - pool.machineType = self.worker_options.machine_type - if self.worker_options.disk_size_gb: - pool.diskSizeGb = self.worker_options.disk_size_gb - if self.worker_options.disk_type: - pool.diskType = self.worker_options.disk_type - if self.worker_options.zone: - pool.zone = self.worker_options.zone - if self.worker_options.network: - pool.network = self.worker_options.network - if self.worker_options.worker_harness_container_image: - pool.workerHarnessContainerImage = ( - self.worker_options.worker_harness_container_image) - else: - # Default to using the worker harness container image for the current SDK - # version. - pool.workerHarnessContainerImage = ( - 'dataflow.gcr.io/v1beta3/python:%s' % - get_required_container_version()) - if self.worker_options.use_public_ips is not None: - if self.worker_options.use_public_ips: - pool.ipConfiguration = ( - dataflow.WorkerPool - .IpConfigurationValueValuesEnum.WORKER_IP_PUBLIC) - else: - pool.ipConfiguration = ( - dataflow.WorkerPool - .IpConfigurationValueValuesEnum.WORKER_IP_PRIVATE) - - if self.standard_options.streaming: - # Use separate data disk for streaming. - disk = dataflow.Disk() - if self.local: - disk.diskType = 'local' - # TODO(ccy): allow customization of disk. - pool.dataDisks.append(disk) - self.proto.workerPools.append(pool) - - sdk_pipeline_options = options.get_all_options() - if sdk_pipeline_options: - self.proto.sdkPipelineOptions = ( - dataflow.Environment.SdkPipelineOptionsValue()) - - options_dict = {k: v - for k, v in sdk_pipeline_options.iteritems() - if v is not None} - self.proto.sdkPipelineOptions.additionalProperties.append( - dataflow.Environment.SdkPipelineOptionsValue.AdditionalProperty( - key='options', value=to_json_value(options_dict))) - - dd = DisplayData.create_from_options(options) - items = [item.get_dict() for item in dd.items] - self.proto.sdkPipelineOptions.additionalProperties.append( - dataflow.Environment.SdkPipelineOptionsValue.AdditionalProperty( - key='display_data', value=to_json_value(items))) - - -class Job(object): - """Wrapper for a dataflow Job protobuf.""" - - def __str__(self): - def encode_shortstrings(input_buffer, errors='strict'): - """Encoder (from Unicode) that suppresses long base64 strings.""" - original_len = len(input_buffer) - if original_len > 150: - if self.base64_str_re.match(input_buffer): - input_buffer = '<string of %d bytes>' % original_len - input_buffer = input_buffer.encode('ascii', errors=errors) - else: - matched = self.coder_str_re.match(input_buffer) - if matched: - input_buffer = '%s<string of %d bytes>' % ( - matched.group(1), matched.end(2) - matched.start(2)) - input_buffer = input_buffer.encode('ascii', errors=errors) - return input_buffer, original_len - - def decode_shortstrings(input_buffer, errors='strict'): - """Decoder (to Unicode) that suppresses long base64 strings.""" - shortened, length = encode_shortstrings(input_buffer, errors) - return unicode(shortened), length - - def shortstrings_registerer(encoding_name): - if encoding_name == 'shortstrings': - return codecs.CodecInfo(name='shortstrings', - encode=encode_shortstrings, - decode=decode_shortstrings) - return None - - codecs.register(shortstrings_registerer) - - # Use json "dump string" method to get readable formatting; - # further modify it to not output too-long strings, aimed at the - # 10,000+ character hex-encoded "serialized_fn" values. - return json.dumps( - json.loads(encoding.MessageToJson(self.proto), encoding='shortstrings'), - indent=2, sort_keys=True) - - @staticmethod - def default_job_name(job_name): - if job_name is None: - user_name = getpass.getuser().lower() - date_component = datetime.utcnow().strftime('%m%d%H%M%S-%f') - app_name = 'beamapp' - job_name = '{}-{}-{}'.format(app_name, user_name, date_component) - return job_name - - def __init__(self, options): - self.options = options - self.google_cloud_options = options.view_as(GoogleCloudOptions) - if not self.google_cloud_options.job_name: - self.google_cloud_options.job_name = self.default_job_name( - self.google_cloud_options.job_name) - - required_google_cloud_options = ['project', 'job_name', 'temp_location'] - missing = [ - option for option in required_google_cloud_options - if not getattr(self.google_cloud_options, option)] - if missing: - raise ValueError( - 'Missing required configuration parameters: %s' % missing) - - if not self.google_cloud_options.staging_location: - logging.info('Defaulting to the temp_location as staging_location: %s', - self.google_cloud_options.temp_location) - (self.google_cloud_options - .staging_location) = self.google_cloud_options.temp_location - - # Make the staging and temp locations job name and time specific. This is - # needed to avoid clashes between job submissions using the same staging - # area or team members using same job names. This method is not entirely - # foolproof since two job submissions with same name can happen at exactly - # the same time. However the window is extremely small given that - # time.time() has at least microseconds granularity. We add the suffix only - # for GCS staging locations where the potential for such clashes is high. - if self.google_cloud_options.staging_location.startswith('gs://'): - path_suffix = '%s.%f' % (self.google_cloud_options.job_name, time.time()) - self.google_cloud_options.staging_location = utils.path.join( - self.google_cloud_options.staging_location, path_suffix) - self.google_cloud_options.temp_location = utils.path.join( - self.google_cloud_options.temp_location, path_suffix) - self.proto = dataflow.Job(name=self.google_cloud_options.job_name) - if self.options.view_as(StandardOptions).streaming: - self.proto.type = dataflow.Job.TypeValueValuesEnum.JOB_TYPE_STREAMING - else: - self.proto.type = dataflow.Job.TypeValueValuesEnum.JOB_TYPE_BATCH - self.base64_str_re = re.compile(r'^[A-Za-z0-9+/]*=*$') - self.coder_str_re = re.compile(r'^([A-Za-z]+\$)([A-Za-z0-9+/]*=*)$') - - def json(self): - return encoding.MessageToJson(self.proto) - - def __reduce__(self): - """Reduce hook for pickling the Job class more easily.""" - return (Job, (self.options,)) - - -class DataflowApplicationClient(object): - """A Dataflow API client used by application code to create and query jobs.""" - - def __init__(self, options, environment_version): - """Initializes a Dataflow API client object.""" - self.standard_options = options.view_as(StandardOptions) - self.google_cloud_options = options.view_as(GoogleCloudOptions) - self.environment_version = environment_version - if self.google_cloud_options.no_auth: - credentials = None - else: - credentials = get_service_credentials() - self._client = dataflow.DataflowV1b3( - url=self.google_cloud_options.dataflow_endpoint, - credentials=credentials, - get_credentials=(not self.google_cloud_options.no_auth)) - self._storage_client = storage.StorageV1( - url='https://www.googleapis.com/storage/v1', - credentials=credentials, - get_credentials=(not self.google_cloud_options.no_auth)) - - # TODO(silviuc): Refactor so that retry logic can be applied. - @retry.no_retries # Using no_retries marks this as an integration point. - def _gcs_file_copy(self, from_path, to_path): - to_folder, to_name = os.path.split(to_path) - with open(from_path, 'rb') as f: - self.stage_file(to_folder, to_name, f) - - def stage_file(self, gcs_or_local_path, file_name, stream, - mime_type='application/octet-stream'): - """Stages a file at a GCS or local path with stream-supplied contents.""" - if not gcs_or_local_path.startswith('gs://'): - local_path = os.path.join(gcs_or_local_path, file_name) - logging.info('Staging file locally to %s', local_path) - with open(local_path, 'wb') as f: - f.write(stream.read()) - return - gcs_location = gcs_or_local_path + '/' + file_name - bucket, name = gcs_location[5:].split('/', 1) - - request = storage.StorageObjectsInsertRequest( - bucket=bucket, name=name) - logging.info('Starting GCS upload to %s...', gcs_location) - upload = storage.Upload(stream, mime_type) - try: - response = self._storage_client.objects.Insert(request, upload=upload) - except exceptions.HttpError as e: - reportable_errors = { - 403: 'access denied', - 404: 'bucket not found', - } - if e.status_code in reportable_errors: - raise IOError(('Could not upload to GCS path %s: %s. Please verify ' - 'that credentials are valid and that you have write ' - 'access to the specified path. Stale credentials can be ' - 'refreshed by executing "gcloud auth login".') % - (gcs_or_local_path, reportable_errors[e.status_code])) - raise - logging.info('Completed GCS upload to %s', gcs_location) - return response - - # TODO(silviuc): Refactor so that retry logic can be applied. - @retry.no_retries # Using no_retries marks this as an integration point. - def create_job(self, job): - """Creates job description. May stage and/or submit for remote execution.""" - self.create_job_description(job) - - # Stage and submit the job when necessary - dataflow_job_file = job.options.view_as(DebugOptions).dataflow_job_file - template_location = ( - job.options.view_as(GoogleCloudOptions).template_location) - - job_location = template_location or dataflow_job_file - if job_location: - gcs_or_local_path = os.path.dirname(job_location) - file_name = os.path.basename(job_location) - self.stage_file(gcs_or_local_path, file_name, StringIO(job.json())) - - if not template_location: - return self.submit_job_description(job) - else: - return None - - def create_job_description(self, job): - """Creates a job described by the workflow proto.""" - resources = dependency.stage_job_resources( - job.options, file_copy=self._gcs_file_copy) - job.proto.environment = Environment( - packages=resources, options=job.options, - environment_version=self.environment_version).proto - # TODO(silviuc): Remove the debug logging eventually. - logging.info('JOB: %s', job) - - def submit_job_description(self, job): - """Creates and excutes a job request.""" - request = dataflow.DataflowProjectsJobsCreateRequest() - request.projectId = self.google_cloud_options.project - request.job = job.proto - - try: - response = self._client.projects_jobs.Create(request) - except exceptions.BadStatusCodeError as e: - logging.error('HTTP status %d trying to create job' - ' at dataflow service endpoint %s', - e.response.status, - self.google_cloud_options.dataflow_endpoint) - logging.fatal('details of server error: %s', e) - raise - logging.info('Create job: %s', response) - # The response is a Job proto with the id for the new job. - logging.info('Created job with id: [%s]', response.id) - logging.info( - 'To access the Dataflow monitoring console, please navigate to ' - 'https://console.developers.google.com/project/%s/dataflow/job/%s', - self.google_cloud_options.project, response.id) - - return response - - @retry.with_exponential_backoff() # Using retry defaults from utils/retry.py - def modify_job_state(self, job_id, new_state): - """Modify the run state of the job. - - Args: - job_id: The id of the job. - new_state: A string representing the new desired state. It could be set to - either 'JOB_STATE_DONE', 'JOB_STATE_CANCELLED' or 'JOB_STATE_DRAINING'. - - Returns: - True if the job was modified successfully. - """ - if new_state == 'JOB_STATE_DONE': - new_state = dataflow.Job.RequestedStateValueValuesEnum.JOB_STATE_DONE - elif new_state == 'JOB_STATE_CANCELLED': - new_state = dataflow.Job.RequestedStateValueValuesEnum.JOB_STATE_CANCELLED - elif new_state == 'JOB_STATE_DRAINING': - new_state = dataflow.Job.RequestedStateValueValuesEnum.JOB_STATE_DRAINING - else: - # Other states could only be set by the service. - return False - - request = dataflow.DataflowProjectsJobsUpdateRequest() - request.jobId = job_id - request.projectId = self.google_cloud_options.project - request.job = dataflow.Job(requestedState=new_state) - - self._client.projects_jobs.Update(request) - return True - - @retry.with_exponential_backoff() # Using retry defaults from utils/retry.py - def get_job(self, job_id): - """Gets the job status for a submitted job. - - Args: - job_id: A string representing the job_id for the workflow as returned - by the a create_job() request. - - Returns: - A Job proto. See below for interesting fields. - - The Job proto returned from a get_job() request contains some interesting - fields: - currentState: An object representing the current state of the job. The - string representation of the object (str() result) has the following - possible values: JOB_STATE_UNKNONW, JOB_STATE_STOPPED, - JOB_STATE_RUNNING, JOB_STATE_DONE, JOB_STATE_FAILED, - JOB_STATE_CANCELLED. - createTime: UTC time when the job was created - (e.g. '2015-03-10T00:01:53.074Z') - currentStateTime: UTC time for the current state of the job. - """ - request = dataflow.DataflowProjectsJobsGetRequest() - request.jobId = job_id - request.projectId = self.google_cloud_options.project - response = self._client.projects_jobs.Get(request) - return response - - @retry.with_exponential_backoff() # Using retry defaults from utils/retry.py - def list_messages( - self, job_id, start_time=None, end_time=None, page_token=None, - minimum_importance=None): - """List messages associated with the execution of a job. - - Args: - job_id: A string representing the job_id for the workflow as returned - by the a create_job() request. - start_time: If specified, only messages generated after the start time - will be returned, otherwise all messages since job started will be - returned. The value is a string representing UTC time - (e.g., '2015-08-18T21:03:50.644Z') - end_time: If specified, only messages generated before the end time - will be returned, otherwise all messages up to current time will be - returned. The value is a string representing UTC time - (e.g., '2015-08-18T21:03:50.644Z') - page_token: A string to be used as next page token if the list call - returned paginated results. - minimum_importance: Filter for messages based on importance. The possible - string values in increasing order of importance are: JOB_MESSAGE_DEBUG, - JOB_MESSAGE_DETAILED, JOB_MESSAGE_BASIC, JOB_MESSAGE_WARNING, - JOB_MESSAGE_ERROR. For example, a filter set on warning will allow only - warnings and errors and exclude all others. - - Returns: - A tuple consisting of a list of JobMessage instances and a - next page token string. - - Raises: - RuntimeError: if an unexpected value for the message_importance argument - is used. - - The JobMessage objects returned by the call contain the following fields: - id: A unique string identifier for the message. - time: A string representing the UTC time of the message - (e.g., '2015-08-18T21:03:50.644Z') - messageImportance: An enumeration value for the message importance. The - value if converted to string will have the following possible values: - JOB_MESSAGE_DEBUG, JOB_MESSAGE_DETAILED, JOB_MESSAGE_BASIC, - JOB_MESSAGE_WARNING, JOB_MESSAGE_ERROR. - messageText: A message string. - """ - request = dataflow.DataflowProjectsJobsMessagesListRequest( - jobId=job_id, projectId=self.google_cloud_options.project) - if page_token is not None: - request.pageToken = page_token - if start_time is not None: - request.startTime = start_time - if end_time is not None: - request.endTime = end_time - if minimum_importance is not None: - if minimum_importance == 'JOB_MESSAGE_DEBUG': - request.minimumImportance = ( - dataflow.DataflowProjectsJobsMessagesListRequest - .MinimumImportanceValueValuesEnum - .JOB_MESSAGE_DEBUG) - elif minimum_importance == 'JOB_MESSAGE_DETAILED': - request.minimumImportance = ( - dataflow.DataflowProjectsJobsMessagesListRequest - .MinimumImportanceValueValuesEnum - .JOB_MESSAGE_DETAILED) - elif minimum_importance == 'JOB_MESSAGE_BASIC': - request.minimumImportance = ( - dataflow.DataflowProjectsJobsMessagesListRequest - .MinimumImportanceValueValuesEnum - .JOB_MESSAGE_BASIC) - elif minimum_importance == 'JOB_MESSAGE_WARNING': - request.minimumImportance = ( - dataflow.DataflowProjectsJobsMessagesListRequest - .MinimumImportanceValueValuesEnum - .JOB_MESSAGE_WARNING) - elif minimum_importance == 'JOB_MESSAGE_ERROR': - request.minimumImportance = ( - dataflow.DataflowProjectsJobsMessagesListRequest - .MinimumImportanceValueValuesEnum - .JOB_MESSAGE_ERROR) - else: - raise RuntimeError( - 'Unexpected value for minimum_importance argument: %r', - minimum_importance) - response = self._client.projects_jobs_messages.List(request) - return response.jobMessages, response.nextPageToken - - -class MetricUpdateTranslators(object): - """Translators between accumulators and dataflow metric updates.""" - - @staticmethod - def translate_boolean(accumulator, metric_update_proto): - metric_update_proto.boolean = accumulator.value - - @staticmethod - def translate_scalar_mean_int(accumulator, metric_update_proto): - if accumulator.count: - metric_update_proto.integerMean = dataflow.IntegerMean() - metric_update_proto.integerMean.sum = to_split_int(accumulator.sum) - metric_update_proto.integerMean.count = to_split_int(accumulator.count) - else: - metric_update_proto.nameAndKind.kind = None - - @staticmethod - def translate_scalar_mean_float(accumulator, metric_update_proto): - if accumulator.count: - metric_update_proto.floatingPointMean = dataflow.FloatingPointMean() - metric_update_proto.floatingPointMean.sum = accumulator.sum - metric_update_proto.floatingPointMean.count = to_split_int( - accumulator.count) - else: - metric_update_proto.nameAndKind.kind = None - - @staticmethod - def translate_scalar_counter_int(accumulator, metric_update_proto): - metric_update_proto.integer = to_split_int(accumulator.value) - - @staticmethod - def translate_scalar_counter_float(accumulator, metric_update_proto): - metric_update_proto.floatingPoint = accumulator.value - - -def to_split_int(n): - res = dataflow.SplitInt64() - res.lowBits = n & 0xffffffff - res.highBits = n >> 32 - return res - - -def translate_distribution(distribution_update, metric_update_proto): - """Translate metrics DistributionUpdate to dataflow distribution update.""" - dist_update_proto = dataflow.DistributionUpdate() - dist_update_proto.min = to_split_int(distribution_update.min) - dist_update_proto.max = to_split_int(distribution_update.max) - dist_update_proto.count = to_split_int(distribution_update.count) - dist_update_proto.sum = to_split_int(distribution_update.sum) - metric_update_proto.distribution = dist_update_proto - - -def translate_value(value, metric_update_proto): - metric_update_proto.integer = to_split_int(value) - - -def translate_scalar(accumulator, metric_update): - metric_update.scalar = to_json_value(accumulator.value, with_type=True) - - -def translate_mean(accumulator, metric_update): - if accumulator.count: - metric_update.meanSum = to_json_value(accumulator.sum, with_type=True) - metric_update.meanCount = to_json_value(accumulator.count, with_type=True) - else: - # A denominator of 0 will raise an error in the service. - # What it means is we have nothing to report yet, so don't. - metric_update.kind = None - - -# To enable a counter on the service, add it to this dictionary. -metric_translations = { - cy_combiners.CountCombineFn: ('sum', translate_scalar), - cy_combiners.SumInt64Fn: ('sum', translate_scalar), - cy_combiners.MinInt64Fn: ('min', translate_scalar), - cy_combiners.MaxInt64Fn: ('max', translate_scalar), - cy_combiners.MeanInt64Fn: ('mean', translate_mean), - cy_combiners.SumFloatFn: ('sum', translate_scalar), - cy_combiners.MinFloatFn: ('min', translate_scalar), - cy_combiners.MaxFloatFn: ('max', translate_scalar), - cy_combiners.MeanFloatFn: ('mean', translate_mean), - cy_combiners.AllCombineFn: ('and', translate_scalar), - cy_combiners.AnyCombineFn: ('or', translate_scalar), -} - -counter_translations = { - cy_combiners.CountCombineFn: ( - dataflow.NameAndKind.KindValueValuesEnum.SUM, - MetricUpdateTranslators.translate_scalar_counter_int), - cy_combiners.SumInt64Fn: ( - dataflow.NameAndKind.KindValueValuesEnum.SUM, - MetricUpdateTranslators.translate_scalar_counter_int), - cy_combiners.MinInt64Fn: ( - dataflow.NameAndKind.KindValueValuesEnum.MIN, - MetricUpdateTranslators.translate_scalar_counter_int), - cy_combiners.MaxInt64Fn: ( - dataflow.NameAndKind.KindValueValuesEnum.MAX, - MetricUpdateTranslators.translate_scalar_counter_int), - cy_combiners.MeanInt64Fn: ( - dataflow.NameAndKind.KindValueValuesEnum.MEAN, - MetricUpdateTranslators.translate_scalar_mean_int), - cy_combiners.SumFloatFn: ( - dataflow.NameAndKind.KindValueValuesEnum.SUM, - MetricUpdateTranslators.translate_scalar_counter_float), - cy_combiners.MinFloatFn: ( - dataflow.NameAndKind.KindValueValuesEnum.MIN, - MetricUpdateTranslators.translate_scalar_counter_float), - cy_combiners.MaxFloatFn: ( - dataflow.NameAndKind.KindValueValuesEnum.MAX, - MetricUpdateTranslators.translate_scalar_counter_float), - cy_combiners.MeanFloatFn: ( - dataflow.NameAndKind.KindValueValuesEnum.MEAN, - MetricUpdateTranslators.translate_scalar_mean_float), - cy_combiners.AllCombineFn: ( - dataflow.NameAndKind.KindValueValuesEnum.AND, - MetricUpdateTranslators.translate_boolean), - cy_combiners.AnyCombineFn: ( - dataflow.NameAndKind.KindValueValuesEnum.OR, - MetricUpdateTranslators.translate_boolean), -}
http://git-wip-us.apache.org/repos/asf/beam/blob/59ad58ac/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/apiclient_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/apiclient_test.py b/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/apiclient_test.py deleted file mode 100644 index 2c53e37..0000000 --- a/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/apiclient_test.py +++ /dev/null @@ -1,96 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -"""Unit tests for the apiclient module.""" - -import unittest - -from mock import Mock - -from apache_beam.metrics.cells import DistributionData -from apache_beam.utils.pipeline_options import PipelineOptions - -from apache_beam.runners.google_cloud_dataflow.dataflow_runner import DataflowRunner -from apache_beam.runners.google_cloud_dataflow.internal.clients import dataflow - -# Protect against environments where apitools library is not available. -# pylint: disable=wrong-import-order, wrong-import-position -try: - from apache_beam.runners.google_cloud_dataflow.internal import apiclient -except ImportError: - apiclient = None -# pylint: enable=wrong-import-order, wrong-import-position - - -@unittest.skipIf(apiclient is None, 'GCP dependencies are not installed') -class UtilTest(unittest.TestCase): - - @unittest.skip("Enable once BEAM-1080 is fixed.") - def test_create_application_client(self): - pipeline_options = PipelineOptions() - apiclient.DataflowApplicationClient( - pipeline_options, - DataflowRunner.BATCH_ENVIRONMENT_MAJOR_VERSION) - - def test_default_job_name(self): - job_name = apiclient.Job.default_job_name(None) - regexp = 'beamapp-.*-[0-9]{10}-[0-9]{6}' - self.assertRegexpMatches(job_name, regexp) - - def test_split_int(self): - number = 12345 - split_number = apiclient.to_split_int(number) - self.assertEqual((split_number.lowBits, split_number.highBits), - (number, 0)) - shift_number = number << 32 - split_number = apiclient.to_split_int(shift_number) - self.assertEqual((split_number.lowBits, split_number.highBits), - (0, number)) - - def test_translate_distribution(self): - metric_update = dataflow.CounterUpdate() - distribution_update = DistributionData(16, 2, 1, 15) - apiclient.translate_distribution(distribution_update, metric_update) - self.assertEqual(metric_update.distribution.min.lowBits, - distribution_update.min) - self.assertEqual(metric_update.distribution.max.lowBits, - distribution_update.max) - self.assertEqual(metric_update.distribution.sum.lowBits, - distribution_update.sum) - self.assertEqual(metric_update.distribution.count.lowBits, - distribution_update.count) - - def test_translate_means(self): - metric_update = dataflow.CounterUpdate() - accumulator = Mock() - accumulator.sum = 16 - accumulator.count = 2 - apiclient.MetricUpdateTranslators.translate_scalar_mean_int(accumulator, - metric_update) - self.assertEqual(metric_update.integerMean.sum.lowBits, accumulator.sum) - self.assertEqual(metric_update.integerMean.count.lowBits, accumulator.count) - - accumulator.sum = 16.0 - accumulator.count = 2 - apiclient.MetricUpdateTranslators.translate_scalar_mean_float(accumulator, - metric_update) - self.assertEqual(metric_update.floatingPointMean.sum, accumulator.sum) - self.assertEqual( - metric_update.floatingPointMean.count.lowBits, accumulator.count) - - -if __name__ == '__main__': - unittest.main() http://git-wip-us.apache.org/repos/asf/beam/blob/59ad58ac/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/clients/__init__.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/clients/__init__.py b/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/clients/__init__.py deleted file mode 100644 index cce3aca..0000000 --- a/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/clients/__init__.py +++ /dev/null @@ -1,16 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# http://git-wip-us.apache.org/repos/asf/beam/blob/59ad58ac/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/clients/dataflow/__init__.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/clients/dataflow/__init__.py b/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/clients/dataflow/__init__.py deleted file mode 100644 index 1399f21..0000000 --- a/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/clients/dataflow/__init__.py +++ /dev/null @@ -1,33 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -"""Common imports for generated dataflow client library.""" -# pylint:disable=wildcard-import - -import pkgutil - -# Protect against environments where apitools library is not available. -# pylint: disable=wrong-import-order, wrong-import-position -try: - from apitools.base.py import * - from apache_beam.runners.google_cloud_dataflow.internal.clients.dataflow.dataflow_v1b3_messages import * - from apache_beam.runners.google_cloud_dataflow.internal.clients.dataflow.dataflow_v1b3_client import * -except ImportError: - pass -# pylint: enable=wrong-import-order, wrong-import-position - -__path__ = pkgutil.extend_path(__path__, __name__) http://git-wip-us.apache.org/repos/asf/beam/blob/59ad58ac/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/clients/dataflow/dataflow_v1b3_client.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/clients/dataflow/dataflow_v1b3_client.py b/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/clients/dataflow/dataflow_v1b3_client.py deleted file mode 100644 index 4d3d525..0000000 --- a/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/clients/dataflow/dataflow_v1b3_client.py +++ /dev/null @@ -1,684 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -"""Generated client library for dataflow version v1b3.""" -# NOTE: This file is autogenerated and should not be edited by hand. -from apitools.base.py import base_api - -from apache_beam.runners.google_cloud_dataflow.internal.clients.dataflow import dataflow_v1b3_messages as messages - - -class DataflowV1b3(base_api.BaseApiClient): - """Generated client library for service dataflow version v1b3.""" - - MESSAGES_MODULE = messages - BASE_URL = u'https://dataflow.googleapis.com/' - - _PACKAGE = u'dataflow' - _SCOPES = [u'https://www.googleapis.com/auth/cloud-platform', u'https://www.googleapis.com/auth/userinfo.email'] - _VERSION = u'v1b3' - _CLIENT_ID = '1042881264118.apps.googleusercontent.com' - _CLIENT_SECRET = 'x_Tw5K8nnjoRAqULM9PFAC2b' - _USER_AGENT = 'x_Tw5K8nnjoRAqULM9PFAC2b' - _CLIENT_CLASS_NAME = u'DataflowV1b3' - _URL_VERSION = u'v1b3' - _API_KEY = None - - def __init__(self, url='', credentials=None, - get_credentials=True, http=None, model=None, - log_request=False, log_response=False, - credentials_args=None, default_global_params=None, - additional_http_headers=None): - """Create a new dataflow handle.""" - url = url or self.BASE_URL - super(DataflowV1b3, self).__init__( - url, credentials=credentials, - get_credentials=get_credentials, http=http, model=model, - log_request=log_request, log_response=log_response, - credentials_args=credentials_args, - default_global_params=default_global_params, - additional_http_headers=additional_http_headers) - self.projects_jobs_debug = self.ProjectsJobsDebugService(self) - self.projects_jobs_messages = self.ProjectsJobsMessagesService(self) - self.projects_jobs_workItems = self.ProjectsJobsWorkItemsService(self) - self.projects_jobs = self.ProjectsJobsService(self) - self.projects_locations_jobs_messages = self.ProjectsLocationsJobsMessagesService(self) - self.projects_locations_jobs_workItems = self.ProjectsLocationsJobsWorkItemsService(self) - self.projects_locations_jobs = self.ProjectsLocationsJobsService(self) - self.projects_locations = self.ProjectsLocationsService(self) - self.projects_templates = self.ProjectsTemplatesService(self) - self.projects = self.ProjectsService(self) - - class ProjectsJobsDebugService(base_api.BaseApiService): - """Service class for the projects_jobs_debug resource.""" - - _NAME = u'projects_jobs_debug' - - def __init__(self, client): - super(DataflowV1b3.ProjectsJobsDebugService, self).__init__(client) - self._upload_configs = { - } - - def GetConfig(self, request, global_params=None): - """Get encoded debug configuration for component. Not cacheable. - - Args: - request: (DataflowProjectsJobsDebugGetConfigRequest) input message - global_params: (StandardQueryParameters, default: None) global arguments - Returns: - (GetDebugConfigResponse) The response message. - """ - config = self.GetMethodConfig('GetConfig') - return self._RunMethod( - config, request, global_params=global_params) - - GetConfig.method_config = lambda: base_api.ApiMethodInfo( - http_method=u'POST', - method_id=u'dataflow.projects.jobs.debug.getConfig', - ordered_params=[u'projectId', u'jobId'], - path_params=[u'jobId', u'projectId'], - query_params=[], - relative_path=u'v1b3/projects/{projectId}/jobs/{jobId}/debug/getConfig', - request_field=u'getDebugConfigRequest', - request_type_name=u'DataflowProjectsJobsDebugGetConfigRequest', - response_type_name=u'GetDebugConfigResponse', - supports_download=False, - ) - - def SendCapture(self, request, global_params=None): - """Send encoded debug capture data for component. - - Args: - request: (DataflowProjectsJobsDebugSendCaptureRequest) input message - global_params: (StandardQueryParameters, default: None) global arguments - Returns: - (SendDebugCaptureResponse) The response message. - """ - config = self.GetMethodConfig('SendCapture') - return self._RunMethod( - config, request, global_params=global_params) - - SendCapture.method_config = lambda: base_api.ApiMethodInfo( - http_method=u'POST', - method_id=u'dataflow.projects.jobs.debug.sendCapture', - ordered_params=[u'projectId', u'jobId'], - path_params=[u'jobId', u'projectId'], - query_params=[], - relative_path=u'v1b3/projects/{projectId}/jobs/{jobId}/debug/sendCapture', - request_field=u'sendDebugCaptureRequest', - request_type_name=u'DataflowProjectsJobsDebugSendCaptureRequest', - response_type_name=u'SendDebugCaptureResponse', - supports_download=False, - ) - - class ProjectsJobsMessagesService(base_api.BaseApiService): - """Service class for the projects_jobs_messages resource.""" - - _NAME = u'projects_jobs_messages' - - def __init__(self, client): - super(DataflowV1b3.ProjectsJobsMessagesService, self).__init__(client) - self._upload_configs = { - } - - def List(self, request, global_params=None): - """Request the job status. - - Args: - request: (DataflowProjectsJobsMessagesListRequest) input message - global_params: (StandardQueryParameters, default: None) global arguments - Returns: - (ListJobMessagesResponse) The response message. - """ - config = self.GetMethodConfig('List') - return self._RunMethod( - config, request, global_params=global_params) - - List.method_config = lambda: base_api.ApiMethodInfo( - http_method=u'GET', - method_id=u'dataflow.projects.jobs.messages.list', - ordered_params=[u'projectId', u'jobId'], - path_params=[u'jobId', u'projectId'], - query_params=[u'endTime', u'location', u'minimumImportance', u'pageSize', u'pageToken', u'startTime'], - relative_path=u'v1b3/projects/{projectId}/jobs/{jobId}/messages', - request_field='', - request_type_name=u'DataflowProjectsJobsMessagesListRequest', - response_type_name=u'ListJobMessagesResponse', - supports_download=False, - ) - - class ProjectsJobsWorkItemsService(base_api.BaseApiService): - """Service class for the projects_jobs_workItems resource.""" - - _NAME = u'projects_jobs_workItems' - - def __init__(self, client): - super(DataflowV1b3.ProjectsJobsWorkItemsService, self).__init__(client) - self._upload_configs = { - } - - def Lease(self, request, global_params=None): - """Leases a dataflow WorkItem to run. - - Args: - request: (DataflowProjectsJobsWorkItemsLeaseRequest) input message - global_params: (StandardQueryParameters, default: None) global arguments - Returns: - (LeaseWorkItemResponse) The response message. - """ - config = self.GetMethodConfig('Lease') - return self._RunMethod( - config, request, global_params=global_params) - - Lease.method_config = lambda: base_api.ApiMethodInfo( - http_method=u'POST', - method_id=u'dataflow.projects.jobs.workItems.lease', - ordered_params=[u'projectId', u'jobId'], - path_params=[u'jobId', u'projectId'], - query_params=[], - relative_path=u'v1b3/projects/{projectId}/jobs/{jobId}/workItems:lease', - request_field=u'leaseWorkItemRequest', - request_type_name=u'DataflowProjectsJobsWorkItemsLeaseRequest', - response_type_name=u'LeaseWorkItemResponse', - supports_download=False, - ) - - def ReportStatus(self, request, global_params=None): - """Reports the status of dataflow WorkItems leased by a worker. - - Args: - request: (DataflowProjectsJobsWorkItemsReportStatusRequest) input message - global_params: (StandardQueryParameters, default: None) global arguments - Returns: - (ReportWorkItemStatusResponse) The response message. - """ - config = self.GetMethodConfig('ReportStatus') - return self._RunMethod( - config, request, global_params=global_params) - - ReportStatus.method_config = lambda: base_api.ApiMethodInfo( - http_method=u'POST', - method_id=u'dataflow.projects.jobs.workItems.reportStatus', - ordered_params=[u'projectId', u'jobId'], - path_params=[u'jobId', u'projectId'], - query_params=[], - relative_path=u'v1b3/projects/{projectId}/jobs/{jobId}/workItems:reportStatus', - request_field=u'reportWorkItemStatusRequest', - request_type_name=u'DataflowProjectsJobsWorkItemsReportStatusRequest', - response_type_name=u'ReportWorkItemStatusResponse', - supports_download=False, - ) - - class ProjectsJobsService(base_api.BaseApiService): - """Service class for the projects_jobs resource.""" - - _NAME = u'projects_jobs' - - def __init__(self, client): - super(DataflowV1b3.ProjectsJobsService, self).__init__(client) - self._upload_configs = { - } - - def Create(self, request, global_params=None): - """Creates a Cloud Dataflow job. - - Args: - request: (DataflowProjectsJobsCreateRequest) input message - global_params: (StandardQueryParameters, default: None) global arguments - Returns: - (Job) The response message. - """ - config = self.GetMethodConfig('Create') - return self._RunMethod( - config, request, global_params=global_params) - - Create.method_config = lambda: base_api.ApiMethodInfo( - http_method=u'POST', - method_id=u'dataflow.projects.jobs.create', - ordered_params=[u'projectId'], - path_params=[u'projectId'], - query_params=[u'location', u'replaceJobId', u'view'], - relative_path=u'v1b3/projects/{projectId}/jobs', - request_field=u'job', - request_type_name=u'DataflowProjectsJobsCreateRequest', - response_type_name=u'Job', - supports_download=False, - ) - - def Get(self, request, global_params=None): - """Gets the state of the specified Cloud Dataflow job. - - Args: - request: (DataflowProjectsJobsGetRequest) input message - global_params: (StandardQueryParameters, default: None) global arguments - Returns: - (Job) The response message. - """ - config = self.GetMethodConfig('Get') - return self._RunMethod( - config, request, global_params=global_params) - - Get.method_config = lambda: base_api.ApiMethodInfo( - http_method=u'GET', - method_id=u'dataflow.projects.jobs.get', - ordered_params=[u'projectId', u'jobId'], - path_params=[u'jobId', u'projectId'], - query_params=[u'location', u'view'], - relative_path=u'v1b3/projects/{projectId}/jobs/{jobId}', - request_field='', - request_type_name=u'DataflowProjectsJobsGetRequest', - response_type_name=u'Job', - supports_download=False, - ) - - def GetMetrics(self, request, global_params=None): - """Request the job status. - - Args: - request: (DataflowProjectsJobsGetMetricsRequest) input message - global_params: (StandardQueryParameters, default: None) global arguments - Returns: - (JobMetrics) The response message. - """ - config = self.GetMethodConfig('GetMetrics') - return self._RunMethod( - config, request, global_params=global_params) - - GetMetrics.method_config = lambda: base_api.ApiMethodInfo( - http_method=u'GET', - method_id=u'dataflow.projects.jobs.getMetrics', - ordered_params=[u'projectId', u'jobId'], - path_params=[u'jobId', u'projectId'], - query_params=[u'location', u'startTime'], - relative_path=u'v1b3/projects/{projectId}/jobs/{jobId}/metrics', - request_field='', - request_type_name=u'DataflowProjectsJobsGetMetricsRequest', - response_type_name=u'JobMetrics', - supports_download=False, - ) - - def List(self, request, global_params=None): - """List the jobs of a project. - - Args: - request: (DataflowProjectsJobsListRequest) input message - global_params: (StandardQueryParameters, default: None) global arguments - Returns: - (ListJobsResponse) The response message. - """ - config = self.GetMethodConfig('List') - return self._RunMethod( - config, request, global_params=global_params) - - List.method_config = lambda: base_api.ApiMethodInfo( - http_method=u'GET', - method_id=u'dataflow.projects.jobs.list', - ordered_params=[u'projectId'], - path_params=[u'projectId'], - query_params=[u'filter', u'location', u'pageSize', u'pageToken', u'view'], - relative_path=u'v1b3/projects/{projectId}/jobs', - request_field='', - request_type_name=u'DataflowProjectsJobsListRequest', - response_type_name=u'ListJobsResponse', - supports_download=False, - ) - - def Update(self, request, global_params=None): - """Updates the state of an existing Cloud Dataflow job. - - Args: - request: (DataflowProjectsJobsUpdateRequest) input message - global_params: (StandardQueryParameters, default: None) global arguments - Returns: - (Job) The response message. - """ - config = self.GetMethodConfig('Update') - return self._RunMethod( - config, request, global_params=global_params) - - Update.method_config = lambda: base_api.ApiMethodInfo( - http_method=u'PUT', - method_id=u'dataflow.projects.jobs.update', - ordered_params=[u'projectId', u'jobId'], - path_params=[u'jobId', u'projectId'], - query_params=[u'location'], - relative_path=u'v1b3/projects/{projectId}/jobs/{jobId}', - request_field=u'job', - request_type_name=u'DataflowProjectsJobsUpdateRequest', - response_type_name=u'Job', - supports_download=False, - ) - - class ProjectsLocationsJobsMessagesService(base_api.BaseApiService): - """Service class for the projects_locations_jobs_messages resource.""" - - _NAME = u'projects_locations_jobs_messages' - - def __init__(self, client): - super(DataflowV1b3.ProjectsLocationsJobsMessagesService, self).__init__(client) - self._upload_configs = { - } - - def List(self, request, global_params=None): - """Request the job status. - - Args: - request: (DataflowProjectsLocationsJobsMessagesListRequest) input message - global_params: (StandardQueryParameters, default: None) global arguments - Returns: - (ListJobMessagesResponse) The response message. - """ - config = self.GetMethodConfig('List') - return self._RunMethod( - config, request, global_params=global_params) - - List.method_config = lambda: base_api.ApiMethodInfo( - http_method=u'GET', - method_id=u'dataflow.projects.locations.jobs.messages.list', - ordered_params=[u'projectId', u'location', u'jobId'], - path_params=[u'jobId', u'location', u'projectId'], - query_params=[u'endTime', u'minimumImportance', u'pageSize', u'pageToken', u'startTime'], - relative_path=u'v1b3/projects/{projectId}/locations/{location}/jobs/{jobId}/messages', - request_field='', - request_type_name=u'DataflowProjectsLocationsJobsMessagesListRequest', - response_type_name=u'ListJobMessagesResponse', - supports_download=False, - ) - - class ProjectsLocationsJobsWorkItemsService(base_api.BaseApiService): - """Service class for the projects_locations_jobs_workItems resource.""" - - _NAME = u'projects_locations_jobs_workItems' - - def __init__(self, client): - super(DataflowV1b3.ProjectsLocationsJobsWorkItemsService, self).__init__(client) - self._upload_configs = { - } - - def Lease(self, request, global_params=None): - """Leases a dataflow WorkItem to run. - - Args: - request: (DataflowProjectsLocationsJobsWorkItemsLeaseRequest) input message - global_params: (StandardQueryParameters, default: None) global arguments - Returns: - (LeaseWorkItemResponse) The response message. - """ - config = self.GetMethodConfig('Lease') - return self._RunMethod( - config, request, global_params=global_params) - - Lease.method_config = lambda: base_api.ApiMethodInfo( - http_method=u'POST', - method_id=u'dataflow.projects.locations.jobs.workItems.lease', - ordered_params=[u'projectId', u'location', u'jobId'], - path_params=[u'jobId', u'location', u'projectId'], - query_params=[], - relative_path=u'v1b3/projects/{projectId}/locations/{location}/jobs/{jobId}/workItems:lease', - request_field=u'leaseWorkItemRequest', - request_type_name=u'DataflowProjectsLocationsJobsWorkItemsLeaseRequest', - response_type_name=u'LeaseWorkItemResponse', - supports_download=False, - ) - - def ReportStatus(self, request, global_params=None): - """Reports the status of dataflow WorkItems leased by a worker. - - Args: - request: (DataflowProjectsLocationsJobsWorkItemsReportStatusRequest) input message - global_params: (StandardQueryParameters, default: None) global arguments - Returns: - (ReportWorkItemStatusResponse) The response message. - """ - config = self.GetMethodConfig('ReportStatus') - return self._RunMethod( - config, request, global_params=global_params) - - ReportStatus.method_config = lambda: base_api.ApiMethodInfo( - http_method=u'POST', - method_id=u'dataflow.projects.locations.jobs.workItems.reportStatus', - ordered_params=[u'projectId', u'location', u'jobId'], - path_params=[u'jobId', u'location', u'projectId'], - query_params=[], - relative_path=u'v1b3/projects/{projectId}/locations/{location}/jobs/{jobId}/workItems:reportStatus', - request_field=u'reportWorkItemStatusRequest', - request_type_name=u'DataflowProjectsLocationsJobsWorkItemsReportStatusRequest', - response_type_name=u'ReportWorkItemStatusResponse', - supports_download=False, - ) - - class ProjectsLocationsJobsService(base_api.BaseApiService): - """Service class for the projects_locations_jobs resource.""" - - _NAME = u'projects_locations_jobs' - - def __init__(self, client): - super(DataflowV1b3.ProjectsLocationsJobsService, self).__init__(client) - self._upload_configs = { - } - - def Create(self, request, global_params=None): - """Creates a Cloud Dataflow job. - - Args: - request: (DataflowProjectsLocationsJobsCreateRequest) input message - global_params: (StandardQueryParameters, default: None) global arguments - Returns: - (Job) The response message. - """ - config = self.GetMethodConfig('Create') - return self._RunMethod( - config, request, global_params=global_params) - - Create.method_config = lambda: base_api.ApiMethodInfo( - http_method=u'POST', - method_id=u'dataflow.projects.locations.jobs.create', - ordered_params=[u'projectId', u'location'], - path_params=[u'location', u'projectId'], - query_params=[u'replaceJobId', u'view'], - relative_path=u'v1b3/projects/{projectId}/locations/{location}/jobs', - request_field=u'job', - request_type_name=u'DataflowProjectsLocationsJobsCreateRequest', - response_type_name=u'Job', - supports_download=False, - ) - - def Get(self, request, global_params=None): - """Gets the state of the specified Cloud Dataflow job. - - Args: - request: (DataflowProjectsLocationsJobsGetRequest) input message - global_params: (StandardQueryParameters, default: None) global arguments - Returns: - (Job) The response message. - """ - config = self.GetMethodConfig('Get') - return self._RunMethod( - config, request, global_params=global_params) - - Get.method_config = lambda: base_api.ApiMethodInfo( - http_method=u'GET', - method_id=u'dataflow.projects.locations.jobs.get', - ordered_params=[u'projectId', u'location', u'jobId'], - path_params=[u'jobId', u'location', u'projectId'], - query_params=[u'view'], - relative_path=u'v1b3/projects/{projectId}/locations/{location}/jobs/{jobId}', - request_field='', - request_type_name=u'DataflowProjectsLocationsJobsGetRequest', - response_type_name=u'Job', - supports_download=False, - ) - - def GetMetrics(self, request, global_params=None): - """Request the job status. - - Args: - request: (DataflowProjectsLocationsJobsGetMetricsRequest) input message - global_params: (StandardQueryParameters, default: None) global arguments - Returns: - (JobMetrics) The response message. - """ - config = self.GetMethodConfig('GetMetrics') - return self._RunMethod( - config, request, global_params=global_params) - - GetMetrics.method_config = lambda: base_api.ApiMethodInfo( - http_method=u'GET', - method_id=u'dataflow.projects.locations.jobs.getMetrics', - ordered_params=[u'projectId', u'location', u'jobId'], - path_params=[u'jobId', u'location', u'projectId'], - query_params=[u'startTime'], - relative_path=u'v1b3/projects/{projectId}/locations/{location}/jobs/{jobId}/metrics', - request_field='', - request_type_name=u'DataflowProjectsLocationsJobsGetMetricsRequest', - response_type_name=u'JobMetrics', - supports_download=False, - ) - - def List(self, request, global_params=None): - """List the jobs of a project. - - Args: - request: (DataflowProjectsLocationsJobsListRequest) input message - global_params: (StandardQueryParameters, default: None) global arguments - Returns: - (ListJobsResponse) The response message. - """ - config = self.GetMethodConfig('List') - return self._RunMethod( - config, request, global_params=global_params) - - List.method_config = lambda: base_api.ApiMethodInfo( - http_method=u'GET', - method_id=u'dataflow.projects.locations.jobs.list', - ordered_params=[u'projectId', u'location'], - path_params=[u'location', u'projectId'], - query_params=[u'filter', u'pageSize', u'pageToken', u'view'], - relative_path=u'v1b3/projects/{projectId}/locations/{location}/jobs', - request_field='', - request_type_name=u'DataflowProjectsLocationsJobsListRequest', - response_type_name=u'ListJobsResponse', - supports_download=False, - ) - - def Update(self, request, global_params=None): - """Updates the state of an existing Cloud Dataflow job. - - Args: - request: (DataflowProjectsLocationsJobsUpdateRequest) input message - global_params: (StandardQueryParameters, default: None) global arguments - Returns: - (Job) The response message. - """ - config = self.GetMethodConfig('Update') - return self._RunMethod( - config, request, global_params=global_params) - - Update.method_config = lambda: base_api.ApiMethodInfo( - http_method=u'PUT', - method_id=u'dataflow.projects.locations.jobs.update', - ordered_params=[u'projectId', u'location', u'jobId'], - path_params=[u'jobId', u'location', u'projectId'], - query_params=[], - relative_path=u'v1b3/projects/{projectId}/locations/{location}/jobs/{jobId}', - request_field=u'job', - request_type_name=u'DataflowProjectsLocationsJobsUpdateRequest', - response_type_name=u'Job', - supports_download=False, - ) - - class ProjectsLocationsService(base_api.BaseApiService): - """Service class for the projects_locations resource.""" - - _NAME = u'projects_locations' - - def __init__(self, client): - super(DataflowV1b3.ProjectsLocationsService, self).__init__(client) - self._upload_configs = { - } - - class ProjectsTemplatesService(base_api.BaseApiService): - """Service class for the projects_templates resource.""" - - _NAME = u'projects_templates' - - def __init__(self, client): - super(DataflowV1b3.ProjectsTemplatesService, self).__init__(client) - self._upload_configs = { - } - - def Create(self, request, global_params=None): - """Creates a Cloud Dataflow job from a template. - - Args: - request: (DataflowProjectsTemplatesCreateRequest) input message - global_params: (StandardQueryParameters, default: None) global arguments - Returns: - (Job) The response message. - """ - config = self.GetMethodConfig('Create') - return self._RunMethod( - config, request, global_params=global_params) - - Create.method_config = lambda: base_api.ApiMethodInfo( - http_method=u'POST', - method_id=u'dataflow.projects.templates.create', - ordered_params=[u'projectId'], - path_params=[u'projectId'], - query_params=[], - relative_path=u'v1b3/projects/{projectId}/templates', - request_field=u'createJobFromTemplateRequest', - request_type_name=u'DataflowProjectsTemplatesCreateRequest', - response_type_name=u'Job', - supports_download=False, - ) - - class ProjectsService(base_api.BaseApiService): - """Service class for the projects resource.""" - - _NAME = u'projects' - - def __init__(self, client): - super(DataflowV1b3.ProjectsService, self).__init__(client) - self._upload_configs = { - } - - def WorkerMessages(self, request, global_params=None): - """Send a worker_message to the service. - - Args: - request: (DataflowProjectsWorkerMessagesRequest) input message - global_params: (StandardQueryParameters, default: None) global arguments - Returns: - (SendWorkerMessagesResponse) The response message. - """ - config = self.GetMethodConfig('WorkerMessages') - return self._RunMethod( - config, request, global_params=global_params) - - WorkerMessages.method_config = lambda: base_api.ApiMethodInfo( - http_method=u'POST', - method_id=u'dataflow.projects.workerMessages', - ordered_params=[u'projectId'], - path_params=[u'projectId'], - query_params=[], - relative_path=u'v1b3/projects/{projectId}/WorkerMessages', - request_field=u'sendWorkerMessagesRequest', - request_type_name=u'DataflowProjectsWorkerMessagesRequest', - response_type_name=u'SendWorkerMessagesResponse', - supports_download=False, - )