Repository: incubator-beam Updated Branches: refs/heads/python-sdk 3454d691f -> bb09c07b6
Fixing inconsistencies in PipelineOptions The following options have changed: * job_name - Default is 'beamapp-username-date-microseconds'. Test was added. * staging_location and temp_location - staging_location was the default of temp_location. Now it's the other way around, and the tests reflect that. * machine_type alias of worker_machine_type has been removed. * disk_type alias of worker_disk_type has been removed. * disk_source_image option has been removed. * no_save_main_session option has been removed. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/35e2fdc7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/35e2fdc7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/35e2fdc7 Branch: refs/heads/python-sdk Commit: 35e2fdc7f22f20d74a569e86ced931209661dec1 Parents: 3454d69 Author: Pablo <pabl...@google.com> Authored: Tue Dec 6 18:01:54 2016 -0800 Committer: Robert Bradshaw <rober...@gmail.com> Committed: Wed Dec 21 15:14:52 2016 -0800 ---------------------------------------------------------------------- sdks/python/apache_beam/internal/apiclient.py | 45 ++++++++++++-------- .../apache_beam/internal/apiclient_test.py | 6 +++ sdks/python/apache_beam/utils/options.py | 33 ++++++-------- .../utils/pipeline_options_validator.py | 11 ++--- .../utils/pipeline_options_validator_test.py | 8 ++-- 5 files changed, 54 insertions(+), 49 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/35e2fdc7/sdks/python/apache_beam/internal/apiclient.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/internal/apiclient.py b/sdks/python/apache_beam/internal/apiclient.py index f1341a7..3a9ba46 100644 --- a/sdks/python/apache_beam/internal/apiclient.py +++ b/sdks/python/apache_beam/internal/apiclient.py @@ -18,6 +18,8 @@ """Dataflow client utility functions.""" import codecs +from datetime import datetime +import getpass import json import logging import os @@ -46,10 +48,6 @@ from apache_beam.utils.options import WorkerOptions from apache_beam.internal.clients import storage import apache_beam.internal.clients.dataflow as dataflow -BIGQUERY_API_SERVICE = 'bigquery.googleapis.com' -COMPUTE_API_SERVICE = 'compute.googleapis.com' -STORAGE_API_SERVICE = 'storage.googleapis.com' - class Step(object): """Wrapper for a dataflow Step protobuf.""" @@ -121,11 +119,13 @@ class Environment(object): self.worker_options = options.view_as(WorkerOptions) self.debug_options = options.view_as(DebugOptions) self.proto = dataflow.Environment() - self.proto.clusterManagerApiService = COMPUTE_API_SERVICE - self.proto.dataset = '%s/cloud_dataflow' % BIGQUERY_API_SERVICE + self.proto.clusterManagerApiService = GoogleCloudOptions.COMPUTE_API_SERVICE + self.proto.dataset = '{}/cloud_dataflow'.format( + GoogleCloudOptions.BIGQUERY_API_SERVICE) self.proto.tempStoragePrefix = ( - self.google_cloud_options.temp_location.replace('gs:/', - STORAGE_API_SERVICE)) + self.google_cloud_options.temp_location.replace( + 'gs:/', + GoogleCloudOptions.STORAGE_API_SERVICE)) # User agent information. self.proto.userAgent = dataflow.Environment.UserAgentValue() self.local = 'localhost' in self.google_cloud_options.dataflow_endpoint @@ -165,7 +165,7 @@ class Environment(object): dataflow.Package( location='%s/%s' % ( self.google_cloud_options.staging_location.replace( - 'gs:/', STORAGE_API_SERVICE), + 'gs:/', GoogleCloudOptions.STORAGE_API_SERVICE), package), name=package)) @@ -174,7 +174,7 @@ class Environment(object): packages=package_descriptors, taskrunnerSettings=dataflow.TaskRunnerSettings( parallelWorkerSettings=dataflow.WorkerSettings( - baseUrl='https://dataflow.googleapis.com', + baseUrl=GoogleCloudOptions.DATAFLOW_ENDPOINT, servicePath=self.google_cloud_options.dataflow_endpoint))) pool.autoscalingSettings = dataflow.AutoscalingSettings() # Set worker pool options received through command line. @@ -195,8 +195,6 @@ class Environment(object): pool.diskSizeGb = self.worker_options.disk_size_gb if self.worker_options.disk_type: pool.diskType = self.worker_options.disk_type - if self.worker_options.disk_source_image: - pool.diskSourceImage = self.worker_options.disk_source_image if self.worker_options.zone: pool.zone = self.worker_options.zone if self.worker_options.network: @@ -299,10 +297,23 @@ class Job(object): json.loads(encoding.MessageToJson(self.proto), encoding='shortstrings'), indent=2, sort_keys=True) + @staticmethod + def default_job_name(job_name): + if job_name is None: + user_name = getpass.getuser().lower() + date_component = datetime.utcnow().strftime('%m%d%H%M%S-%f') + app_name = 'beamapp' + job_name = '{}-{}-{}'.format(app_name, user_name, date_component) + return job_name + def __init__(self, options): self.options = options self.google_cloud_options = options.view_as(GoogleCloudOptions) - required_google_cloud_options = ['project', 'job_name', 'staging_location'] + if not self.google_cloud_options.job_name: + self.google_cloud_options.job_name = self.default_job_name( + self.google_cloud_options.job_name) + + required_google_cloud_options = ['project', 'job_name', 'temp_location'] missing = [ option for option in required_google_cloud_options if not getattr(self.google_cloud_options, option)] @@ -310,11 +321,11 @@ class Job(object): raise ValueError( 'Missing required configuration parameters: %s' % missing) - if not self.google_cloud_options.temp_location: - logging.info('Defaulting to the staging_location as temp_location: %s', - self.google_cloud_options.staging_location) + if not self.google_cloud_options.staging_location: + logging.info('Defaulting to the temp_location as staging_location: %s', + self.google_cloud_options.temp_location) (self.google_cloud_options - .temp_location) = self.google_cloud_options.staging_location + .staging_location) = self.google_cloud_options.temp_location # Make the staging and temp locations job name and time specific. This is # needed to avoid clashes between job submissions using the same staging http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/35e2fdc7/sdks/python/apache_beam/internal/apiclient_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/internal/apiclient_test.py b/sdks/python/apache_beam/internal/apiclient_test.py index 31b2dad..75d00e0 100644 --- a/sdks/python/apache_beam/internal/apiclient_test.py +++ b/sdks/python/apache_beam/internal/apiclient_test.py @@ -16,6 +16,7 @@ # """Unit tests for the apiclient module.""" +import re import unittest from apache_beam.utils.options import PipelineOptions @@ -32,6 +33,11 @@ class UtilTest(unittest.TestCase): pipeline_options, DataflowPipelineRunner.BATCH_ENVIRONMENT_MAJOR_VERSION) + def test_default_job_name(self): + job_name = apiclient.Job.default_job_name(None) + regexp = 'beamapp-[a-z]*-[0-9]{10}-[0-9]{6}' + self.assertTrue(re.match(regexp, job_name)) + if __name__ == '__main__': unittest.main() http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/35e2fdc7/sdks/python/apache_beam/utils/options.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/utils/options.py b/sdks/python/apache_beam/utils/options.py index eaa1065..085c09c 100644 --- a/sdks/python/apache_beam/utils/options.py +++ b/sdks/python/apache_beam/utils/options.py @@ -224,11 +224,16 @@ class TypeOptions(PipelineOptions): class GoogleCloudOptions(PipelineOptions): """Google Cloud Dataflow service execution options.""" + BIGQUERY_API_SERVICE = 'bigquery.googleapis.com' + COMPUTE_API_SERVICE = 'compute.googleapis.com' + STORAGE_API_SERVICE = 'storage.googleapis.com' + DATAFLOW_ENDPOINT = 'https://dataflow.googleapis.com' + @classmethod def _add_argparse_args(cls, parser): parser.add_argument( '--dataflow_endpoint', - default='https://dataflow.googleapis.com', + default=cls.DATAFLOW_ENDPOINT, help= ('The URL for the Dataflow API. If not set, the default public URL ' 'will be used.')) @@ -251,7 +256,6 @@ class GoogleCloudOptions(PipelineOptions): parser.add_argument('--temp_location', default=None, help='GCS path for saving temporary workflow jobs.') - # Options for using service account credentials. parser.add_argument('--service_account_name', default=None, help='Name of the service account for Google APIs.') @@ -272,10 +276,10 @@ class GoogleCloudOptions(PipelineOptions): errors = [] if validator.is_service_runner(): errors.extend(validator.validate_cloud_options(self)) - errors.extend(validator.validate_gcs_path(self, 'staging_location')) - if getattr(self, 'temp_location', - None) or getattr(self, 'staging_location', None) is None: - errors.extend(validator.validate_gcs_path(self, 'temp_location')) + errors.extend(validator.validate_gcs_path(self, 'temp_location')) + if getattr(self, 'staging_location', + None) or getattr(self, 'temp_location', None) is None: + errors.extend(validator.validate_gcs_path(self, 'staging_location')) if self.view_as(DebugOptions).dataflow_job_file: if self.view_as(GoogleCloudOptions).template_location: @@ -312,9 +316,8 @@ class WorkerOptions(PipelineOptions): default=None, # Meaning unset, distinct from 'NONE' meaning don't scale help= ('If and how to auotscale the workerpool.')) - # TODO(silviuc): Remove --machine_type variant of the flag. parser.add_argument( - '--worker_machine_type', '--machine_type', + '--worker_machine_type', dest='machine_type', default=None, help=('Machine type to create Dataflow worker VMs as. See ' @@ -329,21 +332,12 @@ class WorkerOptions(PipelineOptions): help= ('Remote worker disk size, in gigabytes, or 0 to use the default size. ' 'If not set, the Dataflow service will use a reasonable default.')) - # TODO(silviuc): Remove --disk_type variant of the flag. parser.add_argument( - '--worker_disk_type', '--disk_type', + '--worker_disk_type', dest='disk_type', default=None, help=('Specifies what type of persistent disk should be used.')) parser.add_argument( - '--disk_source_image', - default=None, - help= - ('Disk source image to use by VMs for jobs. See ' - 'https://developers.google.com/compute/docs/images for further ' - 'details. If not set, the Dataflow service will use a reasonable ' - 'default.')) - parser.add_argument( '--zone', default=None, help=( @@ -461,9 +455,6 @@ class SetupOptions(PipelineOptions): 'Some workflows do not need the session state if for instance all ' 'their functions/classes are defined in proper modules (not __main__)' ' and the modules are importable in the worker. ')) - parser.add_argument('--no_save_main_session', - dest='save_main_session', - action='store_false') parser.add_argument( '--sdk_location', default='default', http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/35e2fdc7/sdks/python/apache_beam/utils/pipeline_options_validator.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/utils/pipeline_options_validator.py b/sdks/python/apache_beam/utils/pipeline_options_validator.py index b7b2978..c248022 100644 --- a/sdks/python/apache_beam/utils/pipeline_options_validator.py +++ b/sdks/python/apache_beam/utils/pipeline_options_validator.py @@ -17,7 +17,6 @@ """Pipeline options validator. """ - import re from apache_beam.utils.options import DebugOptions @@ -144,12 +143,10 @@ class PipelineOptionsValidator(object): def validate_cloud_options(self, view): """Validates job_name and project arguments.""" errors = [] - job_name = view.job_name - if job_name is None: - errors.extend(self._validate_error(self.ERR_MISSING_OPTION, 'job_name')) - elif not self.is_full_string_match(self.JOB_PATTERN, job_name): - errors.extend(self._validate_error(self.ERR_INVALID_JOB_NAME, job_name)) - + if (view.job_name and + not self.is_full_string_match(self.JOB_PATTERN, view.job_name)): + errors.extend(self._validate_error(self.ERR_INVALID_JOB_NAME, + view.job_name)) project = view.project if project is None: errors.extend(self._validate_error(self.ERR_MISSING_OPTION, 'project')) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/35e2fdc7/sdks/python/apache_beam/utils/pipeline_options_validator_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/utils/pipeline_options_validator_test.py b/sdks/python/apache_beam/utils/pipeline_options_validator_test.py index bffbeca..5e93ff6 100644 --- a/sdks/python/apache_beam/utils/pipeline_options_validator_test.py +++ b/sdks/python/apache_beam/utils/pipeline_options_validator_test.py @@ -70,7 +70,7 @@ class SetupTest(unittest.TestCase): self.assertEqual( self.check_errors_for_arguments( errors, - ['project', 'job_name', 'staging_location', 'temp_location']), + ['project', 'staging_location', 'temp_location']), []) def test_gcs_path(self): @@ -91,13 +91,13 @@ class SetupTest(unittest.TestCase): test_cases = [ {'temp_location': None, 'staging_location': 'gs://foo/bar', - 'errors': []}, + 'errors': ['temp_location']}, {'temp_location': None, 'staging_location': None, 'errors': ['staging_location', 'temp_location']}, {'temp_location': 'gs://foo/bar', 'staging_location': None, - 'errors': ['staging_location']}, + 'errors': []}, {'temp_location': 'gs://foo/bar', 'staging_location': 'gs://ABC/bar', 'errors': ['staging_location']}, @@ -172,7 +172,7 @@ class SetupTest(unittest.TestCase): return validator test_cases = [ - {'job_name': None, 'errors': ['job_name']}, + {'job_name': None, 'errors': []}, {'job_name': '12345', 'errors': ['job_name']}, {'job_name': 'FOO', 'errors': ['job_name']}, {'job_name': 'foo:bar', 'errors': ['job_name']},