Repository: incubator-airflow
Updated Branches:
  refs/heads/master 03e8e5d49 -> 0a5364cd4


[AIRFLOW-23] Support for Google Cloud DataProc

Dear Airflow Maintainers,

Please accept this PR that addresses the following issues:
- AIRFLOW-23

Spark, Hadoop, PySpart, SparkSQL, Pig an Hive support of DataProc cluster

Author: Alex Van Boxel <a...@vanboxel.be>

Closes #1532 from alexvanboxel/AIRFLOW-23.


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/0a5364cd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/0a5364cd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/0a5364cd

Branch: refs/heads/master
Commit: 0a5364cd430a024ed9be4e5b2b5d5174c1b98497
Parents: 03e8e5d
Author: Alex Van Boxel <a...@vanboxel.be>
Authored: Tue May 31 09:58:32 2016 -0700
Committer: Chris Riccomini <chr...@wepay.com>
Committed: Tue May 31 09:58:32 2016 -0700

----------------------------------------------------------------------
 airflow/contrib/hooks/__init__.py              |   1 +
 airflow/contrib/hooks/gcp_dataproc_hook.py     | 145 ++++++
 airflow/contrib/operators/dataproc_operator.py | 464 ++++++++++++++++++++
 3 files changed, 610 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0a5364cd/airflow/contrib/hooks/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/__init__.py 
b/airflow/contrib/hooks/__init__.py
index b4627f5..17a9f29 100644
--- a/airflow/contrib/hooks/__init__.py
+++ b/airflow/contrib/hooks/__init__.py
@@ -26,6 +26,7 @@ _hooks = {
     'qubole_hook': ['QuboleHook'],
     'gcs_hook': ['GoogleCloudStorageHook'],
     'datastore_hook': ['DatastoreHook'],
+    'gcp_dataproc_hook': ['DataProcHook'],
     'cloudant_hook': ['CloudantHook']
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0a5364cd/airflow/contrib/hooks/gcp_dataproc_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/gcp_dataproc_hook.py 
b/airflow/contrib/hooks/gcp_dataproc_hook.py
new file mode 100644
index 0000000..3dd1f64
--- /dev/null
+++ b/airflow/contrib/hooks/gcp_dataproc_hook.py
@@ -0,0 +1,145 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import logging
+import time
+import uuid
+
+from apiclient.discovery import build
+
+from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook
+
+
+class _DataProcJob:
+    def __init__(self, dataproc_api, project_id, job):
+        self.dataproc_api = dataproc_api
+        self.project_id = project_id
+        self.job = dataproc_api.projects().regions().jobs().submit(
+            projectId=self.project_id,
+            region='global',
+            body=job).execute()
+        self.job_id = self.job['reference']['jobId']
+        logging.info('DataProc job %s is %s', self.job_id,
+                     str(self.job['status']['state']))
+
+    def wait_for_done(self):
+        while True:
+            self.job = self.dataproc_api.projects().regions().jobs().get(
+                projectId=self.project_id,
+                region='global',
+                jobId=self.job_id).execute()
+            if 'ERROR' == self.job['status']['state']:
+                print(str(self.job))
+                logging.error('DataProc job %s has errors', self.job_id)
+                logging.error(self.job['status']['details'])
+                logging.debug(str(self.job))
+                return False
+            if 'CANCELLED' == self.job['status']['state']:
+                print(str(self.job))
+                logging.warning('DataProc job %s is cancelled', self.job_id)
+                if 'details' in self.job['status']:
+                    logging.warning(self.job['status']['details'])
+                logging.debug(str(self.job))
+                return False
+            if 'DONE' == self.job['status']['state']:
+                return True
+            logging.debug('DataProc job %s is %s', self.job_id,
+                          str(self.job['status']['state']))
+            time.sleep(5)
+
+    def raise_error(self, message=None):
+        if 'ERROR' == self.job['status']['state']:
+            if message is None:
+                message = "Google DataProc job has error"
+            raise Exception(message + ": " + 
str(self.job['status']['details']))
+
+    def get(self):
+        return self.job
+
+
+class _DataProcJobBuilder:
+    def __init__(self, project_id, task_id, dataproc_cluster, job_type, 
properties):
+        name = task_id + "_" + str(uuid.uuid1())[:8]
+        self.job_type = job_type
+        self.job = {
+            "job": {
+                "reference": {
+                    "projectId": project_id,
+                    "jobId": name,
+                },
+                "placement": {
+                    "clusterName": dataproc_cluster
+                },
+                job_type: {
+                }
+            }
+        }
+        if properties is not None:
+            self.job["job"][job_type]["properties"] = properties
+
+    def add_variables(self, variables):
+        if variables is not None:
+            self.job["job"][self.job_type]["scriptVariables"] = variables
+
+    def add_args(self, args):
+        if args is not None:
+            self.job["job"][self.job_type]["args"] = args
+
+    def add_query(self, query):
+        self.job["job"][self.job_type]["queryList"] = {'queries': [query]}
+
+    def add_jar_file_uris(self, jars):
+        if jars is not None:
+            self.job["job"][self.job_type]["jarFileUris"] = jars
+
+    def add_archive_uris(self, archives):
+        if archives is not None:
+            self.job["job"][self.job_type]["archiveUris"] = archives
+
+    def set_main(self, main_jar, main_class):
+        if main_class is not None and main_jar is not None:
+            raise Exception("Set either main_jar or main_class")
+        if main_jar:
+            self.job["job"][self.job_type]["mainJarFileUri"] = main_jar
+        else:
+            self.job["job"][self.job_type]["mainClass"] = main_class
+
+    def set_python_main(self, main):
+        self.job["job"][self.job_type]["mainPythonFileUri"] = main
+
+    def build(self):
+        return self.job
+
+
+class DataProcHook(GoogleCloudBaseHook):
+    def __init__(self,
+                 gcp_conn_id='google_cloud_default',
+                 delegate_to=None):
+        super(DataProcHook, self).__init__(gcp_conn_id, delegate_to)
+
+    def get_conn(self):
+        """
+        Returns a Google Cloud DataProc service object.
+        """
+        http_authorized = self._authorize()
+        return build('dataproc', 'v1', http=http_authorized)
+
+    def submit(self, project_id, job):
+        submitted = _DataProcJob(self.get_conn(), project_id, job)
+        if not submitted.wait_for_done():
+            submitted.raise_error("DataProcTask has errors")
+
+    def create_job_template(self, task_id, dataproc_cluster, job_type, 
properties):
+        return _DataProcJobBuilder(self.project_id, task_id, dataproc_cluster, 
job_type,
+                                   properties)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0a5364cd/airflow/contrib/operators/dataproc_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/dataproc_operator.py 
b/airflow/contrib/operators/dataproc_operator.py
new file mode 100644
index 0000000..4d3e1e7
--- /dev/null
+++ b/airflow/contrib/operators/dataproc_operator.py
@@ -0,0 +1,464 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from airflow.contrib.hooks.gcp_dataproc_hook import DataProcHook
+from airflow.models import BaseOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class DataProcPigOperator(BaseOperator):
+    """
+    Start a Pig query Job on a Cloud DataProc cluster. The parameters of the 
operation
+    will be passed to the cluster.
+
+    It's a good practice to define dataproc_* parameters in the default_args 
of the dag
+    like the cluster name and UDFs.
+
+    ```
+    default_args = {
+        'dataproc_cluster': 'cluster-1',
+        'dataproc_pig_jars': [
+            'gs://example/udf/jar/datafu/1.2.0/datafu.jar',
+            'gs://example/udf/jar/gpig/1.2/gpig.jar'
+        ]
+    }
+    ```
+
+    You can pass a pig script as string or file reference. Use variables to 
pass on
+    variables for the pig script to be resolved on the cluster or use the 
parameters to
+    be resolved in the script as template parameters.
+
+    ```
+    t1 = DataProcPigOperator(
+        task_id='dataproc_pig',
+        query='a_pig_script.pig',
+        variables={'out': 'gs://example/output/{ds}'},
+    dag=dag)
+    ```
+    """
+    template_fields = ['query', 'variables']
+    template_ext = ('.pg', '.pig',)
+    ui_color = '#0273d4'
+
+    @apply_defaults
+    def __init__(
+            self,
+            query,
+            variables=None,
+            dataproc_cluster='cluster-1',
+            dataproc_pig_properties=None,
+            dataproc_pig_jars=None,
+            gcp_conn_id='google_cloud_default',
+            delegate_to=None,
+            *args,
+            **kwargs):
+        """
+        Create a new DataProcPigOperator.
+
+        For more detail on about job submission have a look at the reference:
+
+        
https://cloud.google.com/dataproc/reference/rest/v1/projects.regions.jobs
+
+        :param query: The query or reference to the query file (pg or pig 
extension).
+        :type query: string
+        :param variables: Map of named parameters for the query.
+        :type variables: dict
+        :param dataproc_cluster: The id of the DataProc cluster.
+        :type dataproc_cluster: string
+        :param dataproc_pig_properties: Map for the Pig properties. Ideal to 
put in
+            default arguments
+        :type dataproc_pig_properties: dict
+        :param dataproc_pig_jars: URIs to jars provisioned in Cloud Storage 
(example: for
+            UDFs and libs) and are ideal to put in default arguments.
+        :type dataproc_pig_jars: list
+        :param gcp_conn_id: The connection ID to use connecting to Google 
Cloud Platform.
+        :type gcp_conn_id: string
+        :param delegate_to: The account to impersonate, if any.
+            For this to work, the service account making the request must have 
domain-wide
+            delegation enabled.
+        :type delegate_to: string
+        """
+        super(DataProcPigOperator, self).__init__(*args, **kwargs)
+        self.gcp_conn_id = gcp_conn_id
+        self.delegate_to = delegate_to
+        self.query = query
+        self.variables = variables
+        self.dataproc_cluster = dataproc_cluster
+        self.dataproc_properties = dataproc_pig_properties
+        self.dataproc_jars = dataproc_pig_jars
+
+    def execute(self, context):
+        hook = DataProcHook(gcp_conn_id=self.gcp_conn_id,
+                            delegate_to=self.delegate_to)
+        job = hook.create_job_template(self.task_id, self.dataproc_cluster, 
"pigJob",
+                                       self.dataproc_properties)
+
+        job.add_query(self.query)
+        job.add_variables(self.variables)
+        job.add_jar_file_uris(self.dataproc_jars)
+
+        hook.submit(hook.project_id, job.build())
+
+
+class DataProcHiveOperator(BaseOperator):
+    """
+    Start a Hive query Job on a Cloud DataProc cluster.
+    """
+    template_fields = ['query', 'variables']
+    template_ext = ('.q',)
+    ui_color = '#0273d4'
+
+    @apply_defaults
+    def __init__(
+            self,
+            query,
+            variables=None,
+            dataproc_cluster='cluster-1',
+            dataproc_hive_properties=None,
+            dataproc_hive_jars=None,
+            gcp_conn_id='google_cloud_default',
+            delegate_to=None,
+            *args,
+            **kwargs):
+        """
+        Create a new DataProcHiveOperator.
+
+        :param query: The query or reference to the query file (q extension).
+        :type query: string
+        :param variables: Map of named parameters for the query.
+        :type variables: dict
+        :param dataproc_cluster: The id of the DataProc cluster.
+        :type dataproc_cluster: string
+        :param dataproc_hive_properties: Map for the Pig properties. Ideal to 
put in
+            default arguments
+        :type dataproc_hive_properties: dict
+        :param dataproc_hive_jars: URIs to jars provisioned in Cloud Storage 
(example: for
+            UDFs and libs) and are ideal to put in default arguments.
+        :type dataproc_hive_jars: list
+        :param gcp_conn_id: The connection ID to use connecting to Google 
Cloud Platform.
+        :type gcp_conn_id: string
+        :param delegate_to: The account to impersonate, if any.
+            For this to work, the service account making the request must have 
domain-wide
+            delegation enabled.
+        :type delegate_to: string
+        """
+        super(DataProcHiveOperator, self).__init__(*args, **kwargs)
+        self.gcp_conn_id = gcp_conn_id
+        self.delegate_to = delegate_to
+        self.query = query
+        self.variables = variables
+        self.dataproc_cluster = dataproc_cluster
+        self.dataproc_properties = dataproc_hive_properties
+        self.dataproc_jars = dataproc_hive_jars
+
+    def execute(self, context):
+        hook = DataProcHook(gcp_conn_id=self.gcp_conn_id,
+                            delegate_to=self.delegate_to)
+
+        job = hook.create_job_template(self.task_id, self.dataproc_cluster, 
"hiveJob",
+                                       self.dataproc_properties)
+
+        job.add_query(self.query)
+        job.add_variables(self.variables)
+        job.add_jar_file_uris(self.dataproc_jars)
+
+        hook.submit(hook.project_id, job.build())
+
+
+class DataProcSparkSqlOperator(BaseOperator):
+    """
+    Start a Spark SQL query Job on a Cloud DataProc cluster.
+    """
+    template_fields = ['query', 'variables']
+    template_ext = ('.q',)
+    ui_color = '#0273d4'
+
+    @apply_defaults
+    def __init__(
+            self,
+            query,
+            variables=None,
+            dataproc_cluster='cluster-1',
+            dataproc_spark_properties=None,
+            dataproc_spark_jars=None,
+            gcp_conn_id='google_cloud_default',
+            delegate_to=None,
+            *args,
+            **kwargs):
+        """
+        Create a new DataProcSparkSqlOperator.
+
+        :param query: The query or reference to the query file (q extension).
+        :type query: string
+        :param variables: Map of named parameters for the query.
+        :type variables: dict
+        :param dataproc_cluster: The id of the DataProc cluster.
+        :type dataproc_cluster: string
+        :param dataproc_spark_properties: Map for the Pig properties. Ideal to 
put in
+            default arguments
+        :type dataproc_spark_properties: dict
+        :param dataproc_spark_jars: URIs to jars provisioned in Cloud Storage 
(example:
+            for UDFs and libs) and are ideal to put in default arguments.
+        :type dataproc_spark_jars: list
+        :param gcp_conn_id: The connection ID to use connecting to Google 
Cloud Platform.
+        :type gcp_conn_id: string
+        :param delegate_to: The account to impersonate, if any.
+            For this to work, the service account making the request must have 
domain-wide
+            delegation enabled.
+        :type delegate_to: string
+        """
+        super(DataProcSparkSqlOperator, self).__init__(*args, **kwargs)
+        self.gcp_conn_id = gcp_conn_id
+        self.delegate_to = delegate_to
+        self.query = query
+        self.variables = variables
+        self.dataproc_cluster = dataproc_cluster
+        self.dataproc_properties = dataproc_spark_properties
+        self.dataproc_jars = dataproc_spark_jars
+
+    def execute(self, context):
+        hook = DataProcHook(gcp_conn_id=self.gcp_conn_id,
+                            delegate_to=self.delegate_to)
+
+        job = hook.create_job_template(self.task_id, self.dataproc_cluster, 
"sparkSqlJob",
+                                       self.dataproc_properties)
+
+        job.add_query(self.query)
+        job.add_variables(self.variables)
+        job.add_jar_file_uris(self.dataproc_jars)
+
+        hook.submit(hook.project_id, job.build())
+
+
+class DataProcSparkOperator(BaseOperator):
+    """
+    Start a Spark Job on a Cloud DataProc cluster.
+    """
+
+    template_fields = ['arguments']
+    ui_color = '#0273d4'
+
+    @apply_defaults
+    def __init__(
+            self,
+            main_jar=None,
+            main_class=None,
+            arguments=None,
+            archives=None,
+            dataproc_cluster='cluster-1',
+            dataproc_spark_properties=None,
+            dataproc_spark_jars=None,
+            gcp_conn_id='google_cloud_default',
+            delegate_to=None,
+            *args,
+            **kwargs):
+        """
+        Create a new DataProcSparkOperator.
+
+        :param main_jar: URI of the job jar provisioned on Cloud Storage. (use 
this or
+            the main_class, not both together).
+        :type main_jar: string
+        :param main_class: Name of the job class. (use this or the main_jar, 
not both
+            together).
+        :type main_class: string
+        :param arguments: Arguments for the job.
+        :type arguments: list
+        :param archives: List of archived files that will be unpacked in the 
work
+            directory. Should be stored in Cloud Storage.
+        :type archives: list
+        :param dataproc_cluster: The id of the DataProc cluster.
+        :type dataproc_cluster: string
+        :param dataproc_spark_properties: Map for the Pig properties. Ideal to 
put in
+            default arguments
+        :type dataproc_spark_properties: dict
+        :param dataproc_spark_jars: URIs to jars provisioned in Cloud Storage 
(example:
+            for UDFs and libs) and are ideal to put in default arguments.
+        :type dataproc_spark_jars: list
+        :param gcp_conn_id: The connection ID to use connecting to Google 
Cloud Platform.
+        :type gcp_conn_id: string
+        :param delegate_to: The account to impersonate, if any.
+            For this to work, the service account making the request must have 
domain-wide
+            delegation enabled.
+        :type delegate_to: string
+        """
+        super(DataProcSparkOperator, self).__init__(*args, **kwargs)
+        self.gcp_conn_id = gcp_conn_id
+        self.delegate_to = delegate_to
+        self.main_jar = main_jar
+        self.main_class = main_class
+        self.arguments = arguments
+        self.archives = archives
+        self.dataproc_cluster = dataproc_cluster
+        self.dataproc_properties = dataproc_spark_properties
+        self.dataproc_jars = dataproc_spark_jars
+
+    def execute(self, context):
+        hook = DataProcHook(gcp_conn_id=self.gcp_conn_id,
+                            delegate_to=self.delegate_to)
+        job = hook.create_job_template(self.task_id, self.dataproc_cluster, 
"sparkJob",
+                                       self.dataproc_properties)
+
+        job.set_main(self.main_jar, self.main_class)
+        job.add_args(self.arguments)
+        job.add_jar_file_uris(self.dataproc_jars)
+        job.add_archive_uris(self.archives)
+
+        hook.submit(hook.project_id, job.build())
+
+
+class DataProcHadoopOperator(BaseOperator):
+    """
+    Start a Hadoop Job on a Cloud DataProc cluster.
+    """
+
+    template_fields = ['arguments']
+    ui_color = '#0273d4'
+
+    @apply_defaults
+    def __init__(
+            self,
+            main_jar=None,
+            main_class=None,
+            arguments=None,
+            archives=None,
+            dataproc_cluster='cluster-1',
+            dataproc_hadoop_properties=None,
+            dataproc_hadoop_jars=None,
+            gcp_conn_id='google_cloud_default',
+            delegate_to=None,
+            *args,
+            **kwargs):
+        """
+        Create a new DataProcHadoopOperator.
+
+        :param main_jar: URI of the job jar provisioned on Cloud Storage. (use 
this or
+            the main_class, not both together).
+        :type main_jar: string
+        :param main_class: Name of the job class. (use this or the main_jar, 
not both
+            together).
+        :type main_class: string
+        :param arguments: Arguments for the job.
+        :type arguments: list
+        :param archives: List of archived files that will be unpacked in the 
work
+            directory. Should be stored in Cloud Storage.
+        :type archives: list
+        :param dataproc_cluster: The id of the DataProc cluster.
+        :type dataproc_cluster: string
+        :param dataproc_hadoop_properties: Map for the Pig properties. Ideal 
to put in
+            default arguments
+        :type dataproc_hadoop_properties: dict
+        :param dataproc_hadoop_jars: URIs to jars provisioned in Cloud Storage 
(example:
+            for UDFs and libs) and are ideal to put in default arguments.
+        :type dataproc_hadoop_jars: list
+        :param gcp_conn_id: The connection ID to use connecting to Google 
Cloud Platform.
+        :type gcp_conn_id: string
+        :param delegate_to: The account to impersonate, if any.
+            For this to work, the service account making the request must have 
domain-wide
+            delegation enabled.
+        :type delegate_to: string
+        """
+        super(DataProcHadoopOperator, self).__init__(*args, **kwargs)
+        self.gcp_conn_id = gcp_conn_id
+        self.delegate_to = delegate_to
+        self.main_jar = main_jar
+        self.main_class = main_class
+        self.arguments = arguments
+        self.archives = archives
+        self.dataproc_cluster = dataproc_cluster
+        self.dataproc_properties = dataproc_hadoop_properties
+        self.dataproc_jars = dataproc_hadoop_jars
+
+    def execute(self, context):
+        hook = DataProcHook(gcp_conn_id=self.gcp_conn_id,
+                            delegate_to=self.delegate_to)
+        job = hook.create_job_template(self.task_id, self.dataproc_cluster, 
"hadoopJob",
+                                       self.dataproc_properties)
+
+        job.set_main(self.main_jar, self.main_class)
+        job.add_args(self.arguments)
+        job.add_jar_file_uris(self.dataproc_jars)
+        job.add_archive_uris(self.archives)
+
+        hook.submit(hook.project_id, job.build())
+
+
+class DataProcPySparkOperator(BaseOperator):
+    """
+    Start a PySpark Job on a Cloud DataProc cluster.
+    """
+
+    template_fields = ['arguments']
+    ui_color = '#0273d4'
+
+    @apply_defaults
+    def __init__(
+            self,
+            main,
+            arguments=None,
+            archives=None,
+            dataproc_cluster='cluster-1',
+            dataproc_pyspark_properties=None,
+            dataproc_pyspark_jars=None,
+            gcp_conn_id='google_cloud_default',
+            delegate_to=None,
+            *args,
+            **kwargs):
+        """
+         Create a new DataProcPySparkOperator.
+
+         :param main: [Required] The Hadoop Compatible Filesystem (HCFS) URI 
of the main
+            Python file to use as the driver. Must be a .py file.
+         :type main: string
+         :param arguments: Arguments for the job.
+         :type arguments: list
+         :param archives: List of archived files that will be unpacked in the 
work
+            directory. Should be stored in Cloud Storage.
+         :type archives: list
+         :param dataproc_cluster: The id of the DataProc cluster.
+         :type dataproc_cluster: string
+         :param dataproc_pyspark_properties: Map for the Pig properties. Ideal 
to put in
+             default arguments
+         :type dataproc_pyspark_properties: dict
+         :param dataproc_pyspark_jars: URIs to jars provisioned in Cloud 
Storage (example:
+             for UDFs and libs) and are ideal to put in default arguments.
+         :type dataproc_pyspark_jars: list
+         :param gcp_conn_id: The connection ID to use connecting to Google 
Cloud Platform.
+         :type gcp_conn_id: string
+         :param delegate_to: The account to impersonate, if any.
+             For this to work, the service account making the request must have
+             domain-wide delegation enabled.
+         :type delegate_to: string
+         """
+        super(DataProcPySparkOperator, self).__init__(*args, **kwargs)
+        self.gcp_conn_id = gcp_conn_id
+        self.delegate_to = delegate_to
+        self.main = main
+        self.arguments = arguments
+        self.archives = archives
+        self.dataproc_cluster = dataproc_cluster
+        self.dataproc_properties = dataproc_pyspark_properties
+        self.dataproc_jars = dataproc_pyspark_jars
+
+    def execute(self, context):
+        hook = DataProcHook(gcp_conn_id=self.gcp_conn_id,
+                            delegate_to=self.delegate_to)
+        job = hook.create_job_template(self.task_id, self.dataproc_cluster, 
"pysparkJob",
+                                       self.dataproc_properties)
+
+        job.set_python_main(self.main)
+        job.add_args(self.arguments)
+        job.add_jar_file_uris(self.dataproc_jars)
+        job.add_archive_uris(self.archives)
+
+        hook.submit(hook.project_id, job.build())

Reply via email to