Repository: incubator-airflow Updated Branches: refs/heads/master 03e8e5d49 -> 0a5364cd4
[AIRFLOW-23] Support for Google Cloud DataProc Dear Airflow Maintainers, Please accept this PR that addresses the following issues: - AIRFLOW-23 Spark, Hadoop, PySpart, SparkSQL, Pig an Hive support of DataProc cluster Author: Alex Van Boxel <a...@vanboxel.be> Closes #1532 from alexvanboxel/AIRFLOW-23. Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/0a5364cd Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/0a5364cd Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/0a5364cd Branch: refs/heads/master Commit: 0a5364cd430a024ed9be4e5b2b5d5174c1b98497 Parents: 03e8e5d Author: Alex Van Boxel <a...@vanboxel.be> Authored: Tue May 31 09:58:32 2016 -0700 Committer: Chris Riccomini <chr...@wepay.com> Committed: Tue May 31 09:58:32 2016 -0700 ---------------------------------------------------------------------- airflow/contrib/hooks/__init__.py | 1 + airflow/contrib/hooks/gcp_dataproc_hook.py | 145 ++++++ airflow/contrib/operators/dataproc_operator.py | 464 ++++++++++++++++++++ 3 files changed, 610 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0a5364cd/airflow/contrib/hooks/__init__.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/hooks/__init__.py b/airflow/contrib/hooks/__init__.py index b4627f5..17a9f29 100644 --- a/airflow/contrib/hooks/__init__.py +++ b/airflow/contrib/hooks/__init__.py @@ -26,6 +26,7 @@ _hooks = { 'qubole_hook': ['QuboleHook'], 'gcs_hook': ['GoogleCloudStorageHook'], 'datastore_hook': ['DatastoreHook'], + 'gcp_dataproc_hook': ['DataProcHook'], 'cloudant_hook': ['CloudantHook'] } http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0a5364cd/airflow/contrib/hooks/gcp_dataproc_hook.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/hooks/gcp_dataproc_hook.py b/airflow/contrib/hooks/gcp_dataproc_hook.py new file mode 100644 index 0000000..3dd1f64 --- /dev/null +++ b/airflow/contrib/hooks/gcp_dataproc_hook.py @@ -0,0 +1,145 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import logging +import time +import uuid + +from apiclient.discovery import build + +from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook + + +class _DataProcJob: + def __init__(self, dataproc_api, project_id, job): + self.dataproc_api = dataproc_api + self.project_id = project_id + self.job = dataproc_api.projects().regions().jobs().submit( + projectId=self.project_id, + region='global', + body=job).execute() + self.job_id = self.job['reference']['jobId'] + logging.info('DataProc job %s is %s', self.job_id, + str(self.job['status']['state'])) + + def wait_for_done(self): + while True: + self.job = self.dataproc_api.projects().regions().jobs().get( + projectId=self.project_id, + region='global', + jobId=self.job_id).execute() + if 'ERROR' == self.job['status']['state']: + print(str(self.job)) + logging.error('DataProc job %s has errors', self.job_id) + logging.error(self.job['status']['details']) + logging.debug(str(self.job)) + return False + if 'CANCELLED' == self.job['status']['state']: + print(str(self.job)) + logging.warning('DataProc job %s is cancelled', self.job_id) + if 'details' in self.job['status']: + logging.warning(self.job['status']['details']) + logging.debug(str(self.job)) + return False + if 'DONE' == self.job['status']['state']: + return True + logging.debug('DataProc job %s is %s', self.job_id, + str(self.job['status']['state'])) + time.sleep(5) + + def raise_error(self, message=None): + if 'ERROR' == self.job['status']['state']: + if message is None: + message = "Google DataProc job has error" + raise Exception(message + ": " + str(self.job['status']['details'])) + + def get(self): + return self.job + + +class _DataProcJobBuilder: + def __init__(self, project_id, task_id, dataproc_cluster, job_type, properties): + name = task_id + "_" + str(uuid.uuid1())[:8] + self.job_type = job_type + self.job = { + "job": { + "reference": { + "projectId": project_id, + "jobId": name, + }, + "placement": { + "clusterName": dataproc_cluster + }, + job_type: { + } + } + } + if properties is not None: + self.job["job"][job_type]["properties"] = properties + + def add_variables(self, variables): + if variables is not None: + self.job["job"][self.job_type]["scriptVariables"] = variables + + def add_args(self, args): + if args is not None: + self.job["job"][self.job_type]["args"] = args + + def add_query(self, query): + self.job["job"][self.job_type]["queryList"] = {'queries': [query]} + + def add_jar_file_uris(self, jars): + if jars is not None: + self.job["job"][self.job_type]["jarFileUris"] = jars + + def add_archive_uris(self, archives): + if archives is not None: + self.job["job"][self.job_type]["archiveUris"] = archives + + def set_main(self, main_jar, main_class): + if main_class is not None and main_jar is not None: + raise Exception("Set either main_jar or main_class") + if main_jar: + self.job["job"][self.job_type]["mainJarFileUri"] = main_jar + else: + self.job["job"][self.job_type]["mainClass"] = main_class + + def set_python_main(self, main): + self.job["job"][self.job_type]["mainPythonFileUri"] = main + + def build(self): + return self.job + + +class DataProcHook(GoogleCloudBaseHook): + def __init__(self, + gcp_conn_id='google_cloud_default', + delegate_to=None): + super(DataProcHook, self).__init__(gcp_conn_id, delegate_to) + + def get_conn(self): + """ + Returns a Google Cloud DataProc service object. + """ + http_authorized = self._authorize() + return build('dataproc', 'v1', http=http_authorized) + + def submit(self, project_id, job): + submitted = _DataProcJob(self.get_conn(), project_id, job) + if not submitted.wait_for_done(): + submitted.raise_error("DataProcTask has errors") + + def create_job_template(self, task_id, dataproc_cluster, job_type, properties): + return _DataProcJobBuilder(self.project_id, task_id, dataproc_cluster, job_type, + properties) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0a5364cd/airflow/contrib/operators/dataproc_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/dataproc_operator.py b/airflow/contrib/operators/dataproc_operator.py new file mode 100644 index 0000000..4d3e1e7 --- /dev/null +++ b/airflow/contrib/operators/dataproc_operator.py @@ -0,0 +1,464 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +from airflow.contrib.hooks.gcp_dataproc_hook import DataProcHook +from airflow.models import BaseOperator +from airflow.utils.decorators import apply_defaults + + +class DataProcPigOperator(BaseOperator): + """ + Start a Pig query Job on a Cloud DataProc cluster. The parameters of the operation + will be passed to the cluster. + + It's a good practice to define dataproc_* parameters in the default_args of the dag + like the cluster name and UDFs. + + ``` + default_args = { + 'dataproc_cluster': 'cluster-1', + 'dataproc_pig_jars': [ + 'gs://example/udf/jar/datafu/1.2.0/datafu.jar', + 'gs://example/udf/jar/gpig/1.2/gpig.jar' + ] + } + ``` + + You can pass a pig script as string or file reference. Use variables to pass on + variables for the pig script to be resolved on the cluster or use the parameters to + be resolved in the script as template parameters. + + ``` + t1 = DataProcPigOperator( + task_id='dataproc_pig', + query='a_pig_script.pig', + variables={'out': 'gs://example/output/{ds}'}, + dag=dag) + ``` + """ + template_fields = ['query', 'variables'] + template_ext = ('.pg', '.pig',) + ui_color = '#0273d4' + + @apply_defaults + def __init__( + self, + query, + variables=None, + dataproc_cluster='cluster-1', + dataproc_pig_properties=None, + dataproc_pig_jars=None, + gcp_conn_id='google_cloud_default', + delegate_to=None, + *args, + **kwargs): + """ + Create a new DataProcPigOperator. + + For more detail on about job submission have a look at the reference: + + https://cloud.google.com/dataproc/reference/rest/v1/projects.regions.jobs + + :param query: The query or reference to the query file (pg or pig extension). + :type query: string + :param variables: Map of named parameters for the query. + :type variables: dict + :param dataproc_cluster: The id of the DataProc cluster. + :type dataproc_cluster: string + :param dataproc_pig_properties: Map for the Pig properties. Ideal to put in + default arguments + :type dataproc_pig_properties: dict + :param dataproc_pig_jars: URIs to jars provisioned in Cloud Storage (example: for + UDFs and libs) and are ideal to put in default arguments. + :type dataproc_pig_jars: list + :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform. + :type gcp_conn_id: string + :param delegate_to: The account to impersonate, if any. + For this to work, the service account making the request must have domain-wide + delegation enabled. + :type delegate_to: string + """ + super(DataProcPigOperator, self).__init__(*args, **kwargs) + self.gcp_conn_id = gcp_conn_id + self.delegate_to = delegate_to + self.query = query + self.variables = variables + self.dataproc_cluster = dataproc_cluster + self.dataproc_properties = dataproc_pig_properties + self.dataproc_jars = dataproc_pig_jars + + def execute(self, context): + hook = DataProcHook(gcp_conn_id=self.gcp_conn_id, + delegate_to=self.delegate_to) + job = hook.create_job_template(self.task_id, self.dataproc_cluster, "pigJob", + self.dataproc_properties) + + job.add_query(self.query) + job.add_variables(self.variables) + job.add_jar_file_uris(self.dataproc_jars) + + hook.submit(hook.project_id, job.build()) + + +class DataProcHiveOperator(BaseOperator): + """ + Start a Hive query Job on a Cloud DataProc cluster. + """ + template_fields = ['query', 'variables'] + template_ext = ('.q',) + ui_color = '#0273d4' + + @apply_defaults + def __init__( + self, + query, + variables=None, + dataproc_cluster='cluster-1', + dataproc_hive_properties=None, + dataproc_hive_jars=None, + gcp_conn_id='google_cloud_default', + delegate_to=None, + *args, + **kwargs): + """ + Create a new DataProcHiveOperator. + + :param query: The query or reference to the query file (q extension). + :type query: string + :param variables: Map of named parameters for the query. + :type variables: dict + :param dataproc_cluster: The id of the DataProc cluster. + :type dataproc_cluster: string + :param dataproc_hive_properties: Map for the Pig properties. Ideal to put in + default arguments + :type dataproc_hive_properties: dict + :param dataproc_hive_jars: URIs to jars provisioned in Cloud Storage (example: for + UDFs and libs) and are ideal to put in default arguments. + :type dataproc_hive_jars: list + :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform. + :type gcp_conn_id: string + :param delegate_to: The account to impersonate, if any. + For this to work, the service account making the request must have domain-wide + delegation enabled. + :type delegate_to: string + """ + super(DataProcHiveOperator, self).__init__(*args, **kwargs) + self.gcp_conn_id = gcp_conn_id + self.delegate_to = delegate_to + self.query = query + self.variables = variables + self.dataproc_cluster = dataproc_cluster + self.dataproc_properties = dataproc_hive_properties + self.dataproc_jars = dataproc_hive_jars + + def execute(self, context): + hook = DataProcHook(gcp_conn_id=self.gcp_conn_id, + delegate_to=self.delegate_to) + + job = hook.create_job_template(self.task_id, self.dataproc_cluster, "hiveJob", + self.dataproc_properties) + + job.add_query(self.query) + job.add_variables(self.variables) + job.add_jar_file_uris(self.dataproc_jars) + + hook.submit(hook.project_id, job.build()) + + +class DataProcSparkSqlOperator(BaseOperator): + """ + Start a Spark SQL query Job on a Cloud DataProc cluster. + """ + template_fields = ['query', 'variables'] + template_ext = ('.q',) + ui_color = '#0273d4' + + @apply_defaults + def __init__( + self, + query, + variables=None, + dataproc_cluster='cluster-1', + dataproc_spark_properties=None, + dataproc_spark_jars=None, + gcp_conn_id='google_cloud_default', + delegate_to=None, + *args, + **kwargs): + """ + Create a new DataProcSparkSqlOperator. + + :param query: The query or reference to the query file (q extension). + :type query: string + :param variables: Map of named parameters for the query. + :type variables: dict + :param dataproc_cluster: The id of the DataProc cluster. + :type dataproc_cluster: string + :param dataproc_spark_properties: Map for the Pig properties. Ideal to put in + default arguments + :type dataproc_spark_properties: dict + :param dataproc_spark_jars: URIs to jars provisioned in Cloud Storage (example: + for UDFs and libs) and are ideal to put in default arguments. + :type dataproc_spark_jars: list + :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform. + :type gcp_conn_id: string + :param delegate_to: The account to impersonate, if any. + For this to work, the service account making the request must have domain-wide + delegation enabled. + :type delegate_to: string + """ + super(DataProcSparkSqlOperator, self).__init__(*args, **kwargs) + self.gcp_conn_id = gcp_conn_id + self.delegate_to = delegate_to + self.query = query + self.variables = variables + self.dataproc_cluster = dataproc_cluster + self.dataproc_properties = dataproc_spark_properties + self.dataproc_jars = dataproc_spark_jars + + def execute(self, context): + hook = DataProcHook(gcp_conn_id=self.gcp_conn_id, + delegate_to=self.delegate_to) + + job = hook.create_job_template(self.task_id, self.dataproc_cluster, "sparkSqlJob", + self.dataproc_properties) + + job.add_query(self.query) + job.add_variables(self.variables) + job.add_jar_file_uris(self.dataproc_jars) + + hook.submit(hook.project_id, job.build()) + + +class DataProcSparkOperator(BaseOperator): + """ + Start a Spark Job on a Cloud DataProc cluster. + """ + + template_fields = ['arguments'] + ui_color = '#0273d4' + + @apply_defaults + def __init__( + self, + main_jar=None, + main_class=None, + arguments=None, + archives=None, + dataproc_cluster='cluster-1', + dataproc_spark_properties=None, + dataproc_spark_jars=None, + gcp_conn_id='google_cloud_default', + delegate_to=None, + *args, + **kwargs): + """ + Create a new DataProcSparkOperator. + + :param main_jar: URI of the job jar provisioned on Cloud Storage. (use this or + the main_class, not both together). + :type main_jar: string + :param main_class: Name of the job class. (use this or the main_jar, not both + together). + :type main_class: string + :param arguments: Arguments for the job. + :type arguments: list + :param archives: List of archived files that will be unpacked in the work + directory. Should be stored in Cloud Storage. + :type archives: list + :param dataproc_cluster: The id of the DataProc cluster. + :type dataproc_cluster: string + :param dataproc_spark_properties: Map for the Pig properties. Ideal to put in + default arguments + :type dataproc_spark_properties: dict + :param dataproc_spark_jars: URIs to jars provisioned in Cloud Storage (example: + for UDFs and libs) and are ideal to put in default arguments. + :type dataproc_spark_jars: list + :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform. + :type gcp_conn_id: string + :param delegate_to: The account to impersonate, if any. + For this to work, the service account making the request must have domain-wide + delegation enabled. + :type delegate_to: string + """ + super(DataProcSparkOperator, self).__init__(*args, **kwargs) + self.gcp_conn_id = gcp_conn_id + self.delegate_to = delegate_to + self.main_jar = main_jar + self.main_class = main_class + self.arguments = arguments + self.archives = archives + self.dataproc_cluster = dataproc_cluster + self.dataproc_properties = dataproc_spark_properties + self.dataproc_jars = dataproc_spark_jars + + def execute(self, context): + hook = DataProcHook(gcp_conn_id=self.gcp_conn_id, + delegate_to=self.delegate_to) + job = hook.create_job_template(self.task_id, self.dataproc_cluster, "sparkJob", + self.dataproc_properties) + + job.set_main(self.main_jar, self.main_class) + job.add_args(self.arguments) + job.add_jar_file_uris(self.dataproc_jars) + job.add_archive_uris(self.archives) + + hook.submit(hook.project_id, job.build()) + + +class DataProcHadoopOperator(BaseOperator): + """ + Start a Hadoop Job on a Cloud DataProc cluster. + """ + + template_fields = ['arguments'] + ui_color = '#0273d4' + + @apply_defaults + def __init__( + self, + main_jar=None, + main_class=None, + arguments=None, + archives=None, + dataproc_cluster='cluster-1', + dataproc_hadoop_properties=None, + dataproc_hadoop_jars=None, + gcp_conn_id='google_cloud_default', + delegate_to=None, + *args, + **kwargs): + """ + Create a new DataProcHadoopOperator. + + :param main_jar: URI of the job jar provisioned on Cloud Storage. (use this or + the main_class, not both together). + :type main_jar: string + :param main_class: Name of the job class. (use this or the main_jar, not both + together). + :type main_class: string + :param arguments: Arguments for the job. + :type arguments: list + :param archives: List of archived files that will be unpacked in the work + directory. Should be stored in Cloud Storage. + :type archives: list + :param dataproc_cluster: The id of the DataProc cluster. + :type dataproc_cluster: string + :param dataproc_hadoop_properties: Map for the Pig properties. Ideal to put in + default arguments + :type dataproc_hadoop_properties: dict + :param dataproc_hadoop_jars: URIs to jars provisioned in Cloud Storage (example: + for UDFs and libs) and are ideal to put in default arguments. + :type dataproc_hadoop_jars: list + :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform. + :type gcp_conn_id: string + :param delegate_to: The account to impersonate, if any. + For this to work, the service account making the request must have domain-wide + delegation enabled. + :type delegate_to: string + """ + super(DataProcHadoopOperator, self).__init__(*args, **kwargs) + self.gcp_conn_id = gcp_conn_id + self.delegate_to = delegate_to + self.main_jar = main_jar + self.main_class = main_class + self.arguments = arguments + self.archives = archives + self.dataproc_cluster = dataproc_cluster + self.dataproc_properties = dataproc_hadoop_properties + self.dataproc_jars = dataproc_hadoop_jars + + def execute(self, context): + hook = DataProcHook(gcp_conn_id=self.gcp_conn_id, + delegate_to=self.delegate_to) + job = hook.create_job_template(self.task_id, self.dataproc_cluster, "hadoopJob", + self.dataproc_properties) + + job.set_main(self.main_jar, self.main_class) + job.add_args(self.arguments) + job.add_jar_file_uris(self.dataproc_jars) + job.add_archive_uris(self.archives) + + hook.submit(hook.project_id, job.build()) + + +class DataProcPySparkOperator(BaseOperator): + """ + Start a PySpark Job on a Cloud DataProc cluster. + """ + + template_fields = ['arguments'] + ui_color = '#0273d4' + + @apply_defaults + def __init__( + self, + main, + arguments=None, + archives=None, + dataproc_cluster='cluster-1', + dataproc_pyspark_properties=None, + dataproc_pyspark_jars=None, + gcp_conn_id='google_cloud_default', + delegate_to=None, + *args, + **kwargs): + """ + Create a new DataProcPySparkOperator. + + :param main: [Required] The Hadoop Compatible Filesystem (HCFS) URI of the main + Python file to use as the driver. Must be a .py file. + :type main: string + :param arguments: Arguments for the job. + :type arguments: list + :param archives: List of archived files that will be unpacked in the work + directory. Should be stored in Cloud Storage. + :type archives: list + :param dataproc_cluster: The id of the DataProc cluster. + :type dataproc_cluster: string + :param dataproc_pyspark_properties: Map for the Pig properties. Ideal to put in + default arguments + :type dataproc_pyspark_properties: dict + :param dataproc_pyspark_jars: URIs to jars provisioned in Cloud Storage (example: + for UDFs and libs) and are ideal to put in default arguments. + :type dataproc_pyspark_jars: list + :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform. + :type gcp_conn_id: string + :param delegate_to: The account to impersonate, if any. + For this to work, the service account making the request must have + domain-wide delegation enabled. + :type delegate_to: string + """ + super(DataProcPySparkOperator, self).__init__(*args, **kwargs) + self.gcp_conn_id = gcp_conn_id + self.delegate_to = delegate_to + self.main = main + self.arguments = arguments + self.archives = archives + self.dataproc_cluster = dataproc_cluster + self.dataproc_properties = dataproc_pyspark_properties + self.dataproc_jars = dataproc_pyspark_jars + + def execute(self, context): + hook = DataProcHook(gcp_conn_id=self.gcp_conn_id, + delegate_to=self.delegate_to) + job = hook.create_job_template(self.task_id, self.dataproc_cluster, "pysparkJob", + self.dataproc_properties) + + job.set_python_main(self.main) + job.add_args(self.arguments) + job.add_jar_file_uris(self.dataproc_jars) + job.add_archive_uris(self.archives) + + hook.submit(hook.project_id, job.build())