Repository: incubator-airflow
Updated Branches:
  refs/heads/master 6efe2e3ce -> 667a26ce4


[AIRFLOW-1551] Add operator to trigger Jenkins job

Closes #2553 from moe-nadal-ck/AIRFLOW-1551/AddJenkinsOperator


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/667a26ce
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/667a26ce
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/667a26ce

Branch: refs/heads/master
Commit: 667a26ce492d944793eb25c72b9f21e41266c7d9
Parents: 6efe2e3
Author: Moe Nadal <moe.na...@creditkarma.com>
Authored: Tue Feb 27 11:51:45 2018 +0100
Committer: Fokko Driesprong <fokkodriespr...@godatadriven.com>
Committed: Tue Feb 27 11:51:49 2018 +0100

----------------------------------------------------------------------
 .rat-excludes                                   |   1 +
 ...enkins_job_trigger_operator.py.notexecutable |  77 ++++++
 airflow/contrib/hooks/__init__.py               |   1 +
 airflow/contrib/hooks/jenkins_hook.py           |  43 ++++
 .../operators/jenkins_job_trigger_operator.py   | 244 +++++++++++++++++++
 airflow/models.py                               |   1 +
 airflow/www/static/connection_form.js           |  10 +-
 scripts/ci/requirements.txt                     |   1 +
 setup.py                                        |   1 +
 .../contrib/operators/test_jenkins_operator.py  | 143 +++++++++++
 10 files changed, 521 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/667a26ce/.rat-excludes
----------------------------------------------------------------------
diff --git a/.rat-excludes b/.rat-excludes
index 25fe61e..e5373aa 100644
--- a/.rat-excludes
+++ b/.rat-excludes
@@ -8,6 +8,7 @@ requirements.txt
 .*log
 .travis.yml
 .*pyc
+.*lock
 docs
 .*md
 dist

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/667a26ce/airflow/contrib/example_dags/example_jenkins_job_trigger_operator.py.notexecutable
----------------------------------------------------------------------
diff --git 
a/airflow/contrib/example_dags/example_jenkins_job_trigger_operator.py.notexecutable
 
b/airflow/contrib/example_dags/example_jenkins_job_trigger_operator.py.notexecutable
new file mode 100644
index 0000000..6762b9c
--- /dev/null
+++ 
b/airflow/contrib/example_dags/example_jenkins_job_trigger_operator.py.notexecutable
@@ -0,0 +1,77 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+
+from airflow import DAG
+from airflow.contrib.operators.jenkins_job_trigger_operator import 
JenkinsJobTriggerOperator
+from airflow.operators.python_operator import PythonOperator
+from airflow.contrib.hooks.jenkins_hook import JenkinsHook
+
+from six.moves.urllib.request import Request
+
+import jenkins
+import datetime
+
+
+datetime_start_date = datetime(2017, 6, 1)
+default_args = {
+    "owner": "airflow",
+    "start_date": datetime_start_date,
+    "retries": 1,
+    "retry_delay": timedelta(minutes=5),
+    "depends_on_past": False,
+    "concurrency": 8,
+    "max_active_runs": 8
+
+}
+
+
+dag = DAG("test_jenkins", default_args=default_args, schedule_interval=None)
+#This DAG shouldn't be executed and is only here to provide example of how to 
use the JenkinsJobTriggerOperator
+#(it requires a jenkins server to be executed)
+
+job_trigger = JenkinsJobTriggerOperator(
+    dag=dag,
+    task_id="trigger_job",
+    job_name="generate-merlin-config",
+    parameters={"first_parameter":"a_value", "second_parameter":"18"},
+    #parameters="resources/paremeter.json", You can also pass a path to a json 
file containing your param
+    jenkins_connection_id="your_jenkins_connection" #The connection must be 
configured first
+    )
+
+def grabArtifactFromJenkins(**context):
+    """
+    Grab an artifact from the previous job
+    The python-jenkins library doesn't expose a method for that
+    But it's totally possible to build manually the request for that
+    """
+    hook = JenkinsHook("your_jenkins_connection")
+    jenkins_server = hook.get_jenkins_server()
+    url = context['task_instance'].xcom_pull(task_ids='trigger_job')
+    #The JenkinsJobTriggerOperator store the job url in the xcom variable 
corresponding to the task
+    #You can then use it to access things or to get the job number
+    #This url looks like : http://jenkins_url/job/job_name/job_number/
+    url = url + "artifact/myartifact.xml" #Or any other artifact name
+    request = Request(url)
+    response = jenkins_server.jenkins_open(request)
+    return response #We store the artifact content in a xcom variable for 
later use
+
+artifact_grabber = PythonOperator(
+    task_id='artifact_grabber',
+    provide_context=True,
+    python_callable=grabArtifactFromJenkins,
+    dag=dag)
+
+artifact_grabber.set_upstream(job_trigger)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/667a26ce/airflow/contrib/hooks/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/__init__.py 
b/airflow/contrib/hooks/__init__.py
index 99a1746..83e9f1f 100644
--- a/airflow/contrib/hooks/__init__.py
+++ b/airflow/contrib/hooks/__init__.py
@@ -50,6 +50,7 @@ _hooks = {
     'fs_hook': ['FSHook'],
     'wasb_hook': ['WasbHook'],
     'gcp_pubsub_hook': ['PubSubHook'],
+    'jenkins_hook': ['JenkinsHook'],
     'aws_dynamodb_hook': ['AwsDynamoDBHook']
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/667a26ce/airflow/contrib/hooks/jenkins_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/jenkins_hook.py 
b/airflow/contrib/hooks/jenkins_hook.py
new file mode 100644
index 0000000..66d570e
--- /dev/null
+++ b/airflow/contrib/hooks/jenkins_hook.py
@@ -0,0 +1,43 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from airflow.hooks.base_hook import BaseHook
+
+import jenkins
+import distutils
+
+
+class JenkinsHook(BaseHook):
+    """
+    Hook to manage connection to jenkins server
+    """
+
+    def __init__(self, conn_id='jenkins_default'):
+        connection = self.get_connection(conn_id)
+        self.connection = connection
+        connectionPrefix = 'http'
+        # connection.extra contains info about using https (true) or http 
(false)
+        if connection.extra is None or connection.extra == '':
+            connection.extra = 'false'
+            # set a default value to connection.extra
+            # to avoid rising ValueError in strtobool
+        if distutils.util.strtobool(connection.extra):
+            connectionPrefix = 'https'
+        url = '%s://%s:%d' % (connectionPrefix, connection.host, 
connection.port)
+        self.log.info('Trying to connect to %s', url)
+        self.jenkins_server = jenkins.Jenkins(url, connection.login, 
connection.password)
+
+    def get_jenkins_server(self):
+        return self.jenkins_server

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/667a26ce/airflow/contrib/operators/jenkins_job_trigger_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/jenkins_job_trigger_operator.py 
b/airflow/contrib/operators/jenkins_job_trigger_operator.py
new file mode 100644
index 0000000..5227b24
--- /dev/null
+++ b/airflow/contrib/operators/jenkins_job_trigger_operator.py
@@ -0,0 +1,244 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import time
+import socket
+import json
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.utils.decorators import apply_defaults
+from airflow.contrib.hooks.jenkins_hook import JenkinsHook
+import jenkins
+from jenkins import JenkinsException
+from six.moves.urllib.request import Request, urlopen
+from six.moves.urllib.error import HTTPError, URLError
+
+try:
+    basestring
+except NameError:
+    basestring = str  # For python3 compatibility
+
+
+# TODO Use jenkins_urlopen instead when it will be available
+# in the stable python-jenkins version (> 0.4.15)
+def jenkins_request_with_headers(jenkins_server, req, add_crumb=True):
+    """
+    We need to get the headers in addition to the body answer
+    to get the location from them
+    This function is just a copy of the one present in python-jenkins library
+    with just the return call changed
+    :param jenkins_server: The server to query
+    :param req: The request to execute
+    :param add_crumb: Boolean to indicate if it should add crumb to the request
+    :return:
+    """
+    try:
+        if jenkins_server.auth:
+            req.add_header('Authorization', jenkins_server.auth)
+        if add_crumb:
+            jenkins_server.maybe_add_crumb(req)
+        response = urlopen(req, timeout=jenkins_server.timeout)
+        response_body = response.read()
+        response_headers = response.info()
+        if response_body is None:
+            raise jenkins.EmptyResponseException(
+                "Error communicating with server[%s]: "
+                "empty response" % jenkins_server.server)
+        return {'body': response_body.decode('utf-8'), 'headers': 
response_headers}
+    except HTTPError as e:
+        # Jenkins's funky authentication means its nigh impossible to
+        # distinguish errors.
+        if e.code in [401, 403, 500]:
+            # six.moves.urllib.error.HTTPError provides a 'reason'
+            # attribute for all python version except for ver 2.6
+            # Falling back to HTTPError.msg since it contains the
+            # same info as reason
+            raise JenkinsException(
+                'Error in request. ' +
+                'Possibly authentication failed [%s]: %s' % (
+                    e.code, e.msg)
+            )
+        elif e.code == 404:
+            raise jenkins.NotFoundException('Requested item could not be 
found')
+        else:
+            raise
+    except socket.timeout as e:
+        raise jenkins.TimeoutException('Error in request: %s' % e)
+    except URLError as e:
+        # python 2.6 compatibility to ensure same exception raised
+        # since URLError wraps a socket timeout on python 2.6.
+        if str(e.reason) == "timed out":
+            raise jenkins.TimeoutException('Error in request: %s' % e.reason)
+        raise JenkinsException('Error in request: %s' % e.reason)
+
+
+class JenkinsJobTriggerOperator(BaseOperator):
+    """
+    Trigger a Jenkins Job and monitor it's execution.
+    This operator depend on python-jenkins library,
+    version >= 0.4.15 to communicate with jenkins server.
+    You'll also need to configure a Jenkins connection in the connections 
screen.
+    :param jenkins_connection_id: The jenkins connection to use for this job
+    :type jenkins_connection_id: string
+    :param job_name: The name of the job to trigger
+    :type job_name: string
+    :param parameters: The parameters block to provide to jenkins
+    :type parameters: string
+    :param sleep_time: How long will the operator sleep between each status
+    request for the job (min 1, default 10)
+    :type sleep_time: int
+    :param max_try_before_job_appears: The maximum number of requests to make
+        while waiting for the job to appears on jenkins server (default 10)
+    :type max_try_before_job_appears: int
+    """
+    template_fields = ('parameters',)
+    template_ext = ('.json',)
+    ui_color = '#f9ec86'
+
+    @apply_defaults
+    def __init__(self,
+                 jenkins_connection_id,
+                 job_name,
+                 parameters="",
+                 sleep_time=10,
+                 max_try_before_job_appears=10,
+                 *args,
+                 **kwargs):
+        super(JenkinsJobTriggerOperator, self).__init__(*args, **kwargs)
+        self.job_name = job_name
+        self.parameters = parameters
+        if sleep_time < 1:
+            sleep_time = 1
+        self.sleep_time = sleep_time
+        self.jenkins_connection_id = jenkins_connection_id
+        self.max_try_before_job_appears = max_try_before_job_appears
+
+    def build_job(self, jenkins_server):
+        """
+        This function makes an API call to Jenkins to trigger a build for 
'job_name'
+        It returned a dict with 2 keys : body and headers.
+        headers contains also a dict-like object which can be queried to get
+        the location to poll in the queue.
+        :param jenkins_server: The jenkins server where the job should be 
triggered
+        :return: Dict containing the response body (key body)
+        and the headers coming along (headers)
+        """
+        # Warning if the parameter is too long, the URL can be longer than
+        # the maximum allowed size
+        if self.parameters and isinstance(self.parameters, basestring):
+            import ast
+            self.parameters = ast.literal_eval(self.parameters)
+
+        if not self.parameters:
+            # We need a None to call the non parametrized jenkins api end point
+            self.parameters = None
+
+        request = Request(jenkins_server.build_job_url(self.job_name,
+                                                       self.parameters, None), 
b'')
+        return jenkins_request_with_headers(jenkins_server, request)
+
+    def poll_job_in_queue(self, location, jenkins_server):
+        """
+        This method poll the jenkins queue until the job is executed.
+        When we trigger a job through an API call,
+        the job is first put in the queue without having a build number 
assigned.
+        Thus we have to wait the job exit the queue to know its build number.
+        To do so, we have to add /api/json (or /api/xml) to the location
+        returned by the build_job call and poll this file.
+        When a 'executable' block appears in the json, it means the job 
execution started
+        and the field 'number' then contains the build number.
+        :param location: Location to poll, returned in the header of the 
build_job call
+        :param jenkins_server: The jenkins server to poll
+        :return: The build_number corresponding to the triggered job
+        """
+        try_count = 0
+        location = location + '/api/json'
+        # TODO Use get_queue_info instead
+        # once it will be available in python-jenkins (v > 0.4.15)
+        self.log.info('Polling jenkins queue at the url %s', location)
+        while try_count < self.max_try_before_job_appears:
+            location_answer = jenkins_request_with_headers(jenkins_server,
+                                                           Request(location))
+            if location_answer is not None:
+                json_response = json.loads(location_answer['body'])
+                if 'executable' in json_response:
+                    build_number = json_response['executable']['number']
+                    self.log.info('Job executed on Jenkins side with the build 
number %s',
+                                  build_number)
+                    return build_number
+            try_count += 1
+            time.sleep(self.sleep_time)
+        raise AirflowException("The job hasn't been executed"
+                               " after polling the queue %d times",
+                               self.max_try_before_job_appears)
+
+    def get_hook(self):
+        return JenkinsHook(self.jenkins_connection_id)
+
+    def execute(self, context):
+        if not self.jenkins_connection_id:
+            self.log.error(
+                'Please specify the jenkins connection id to use.'
+                'You must create a Jenkins connection before'
+                ' being able to use this operator')
+            raise AirflowException('The jenkins_connection_id parameter is 
missing,'
+                                   'impossible to trigger the job')
+
+        if not self.job_name:
+            self.log.error("Please specify the job name to use in the job_name 
parameter")
+            raise AirflowException('The job_name parameter is missing,'
+                                   'impossible to trigger the job')
+
+        self.log.info(
+            'Triggering the job %s on the jenkins : %s with the parameters : 
%s',
+            self.job_name, self.jenkins_connection_id, self.parameters)
+        jenkins_server = self.get_hook().get_jenkins_server()
+        jenkins_response = self.build_job(jenkins_server)
+        build_number = self.poll_job_in_queue(
+            jenkins_response['headers']['Location'], jenkins_server)
+
+        time.sleep(self.sleep_time)
+        keep_polling_job = True
+        build_info = None
+        while keep_polling_job:
+            try:
+                build_info = jenkins_server.get_build_info(name=self.job_name,
+                                                           number=build_number)
+                if build_info['result'] is not None:
+                    keep_polling_job = False
+                    # Check if job had errors.
+                    if build_info['result'] != 'SUCCESS':
+                        raise AirflowException(
+                            'Jenkins job failed, final state : %s.'
+                            'Find more information on job url : %s'
+                            % (build_info['result'], build_info['url']))
+                else:
+                    self.log.info('Waiting for job to complete : %s , build 
%s',
+                                  self.job_name, build_number)
+                    time.sleep(self.sleep_time)
+            except jenkins.NotFoundException as err:
+                raise AirflowException(
+                    'Jenkins job status check failed. Final error was: %s'
+                    % err.resp.status)
+            except jenkins.JenkinsException as err:
+                raise AirflowException(
+                    'Jenkins call failed with error : %s, if you have 
parameters '
+                    'double check them, jenkins sends back '
+                    'this exception for unknown parameters'
+                    'You can also check logs for more details on this 
exception '
+                    '(jenkins_url/log/rss)', str(err))
+        if build_info:
+            # If we can we return the url of the job
+            # for later use (like retrieving an artifact)
+            return build_info['url']

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/667a26ce/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index f9f3fbc..f2c3f2e 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -546,6 +546,7 @@ class Connection(Base, LoggingMixin):
         ('hive_metastore', 'Hive Metastore Thrift',),
         ('hiveserver2', 'Hive Server 2 Thrift',),
         ('jdbc', 'Jdbc Connection',),
+        ('jenkins', 'Jenkins'),
         ('mysql', 'MySQL',),
         ('postgres', 'Postgres',),
         ('oracle', 'Oracle',),

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/667a26ce/airflow/www/static/connection_form.js
----------------------------------------------------------------------
diff --git a/airflow/www/static/connection_form.js 
b/airflow/www/static/connection_form.js
index c40bba7..902cf74 100644
--- a/airflow/www/static/connection_form.js
+++ b/airflow/www/static/connection_form.js
@@ -39,12 +39,20 @@
                 'schema': 'Database'
             }
         },
+        jenkins: {
+          hidden_fields: ['schema'],
+          relabeling: {
+            'login': 'Username',
+            'password': 'API token or password',
+            'extra': 'Use https (true/false, default false)'
+          }
+        },
         docker: {
             hidden_fields: ['port', 'schema'],
             relabeling: {
                 'host': 'Registry URL',
                 'login': 'Username',
-            },
+            }
         },
       }
       function connTypeChange(connectionType) {

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/667a26ce/scripts/ci/requirements.txt
----------------------------------------------------------------------
diff --git a/scripts/ci/requirements.txt b/scripts/ci/requirements.txt
index 4bcc453..9c028d5 100644
--- a/scripts/ci/requirements.txt
+++ b/scripts/ci/requirements.txt
@@ -74,6 +74,7 @@ PyOpenSSL
 PySmbClient
 python-daemon
 python-dateutil
+python-jenkins
 qds-sdk>=1.9.6
 redis
 rednose

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/667a26ce/setup.py
----------------------------------------------------------------------
diff --git a/setup.py b/setup.py
index 0ff6ea2..d3f48e3 100644
--- a/setup.py
+++ b/setup.py
@@ -128,6 +128,7 @@ gcp_api = [
 ]
 hdfs = ['snakebite>=2.7.8']
 webhdfs = ['hdfs[dataframe,avro,kerberos]>=2.0.4']
+jenkins = ['python-jenkins>=0.4.15']
 jira = ['JIRA>1.0.7']
 hive = [
     'hive-thrift-py>=0.0.1',

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/667a26ce/tests/contrib/operators/test_jenkins_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/test_jenkins_operator.py 
b/tests/contrib/operators/test_jenkins_operator.py
new file mode 100644
index 0000000..e61201a
--- /dev/null
+++ b/tests/contrib/operators/test_jenkins_operator.py
@@ -0,0 +1,143 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+
+import jenkins
+
+from airflow.contrib.operators.jenkins_job_trigger_operator \
+    import JenkinsJobTriggerOperator
+from airflow.contrib.hooks.jenkins_hook import JenkinsHook
+
+from airflow.exceptions import AirflowException
+
+try:
+    from unittest import mock
+except ImportError:
+    try:
+        import mock
+    except ImportError:
+        mock = None
+
+
+class JenkinsOperatorTestCase(unittest.TestCase):
+    @unittest.skipIf(mock is None, 'mock package not present')
+    def test_execute(self):
+        jenkins_mock = mock.Mock(spec=jenkins.Jenkins, auth='secret')
+        jenkins_mock.get_build_info.return_value = \
+            {'result': 'SUCCESS',
+             'url': 'http://aaa.fake-url.com/congratulation/its-a-job'}
+        jenkins_mock.build_job_url.return_value = \
+            'http://www.jenkins.url/somewhere/in/the/universe'
+
+        hook_mock = mock.Mock(spec=JenkinsHook)
+        hook_mock.get_jenkins_server.return_value = jenkins_mock
+
+        the_parameters = {'a_param': 'blip', 'another_param': '42'}
+
+        with mock.patch.object(JenkinsJobTriggerOperator, "get_hook") as 
get_hook_mocked,\
+                mock.patch('airflow.contrib.operators'
+                           
'.jenkins_job_trigger_operator.jenkins_request_with_headers') \
+                as mock_make_request:
+            mock_make_request.side_effect = \
+                [{'body': '', 'headers': {'Location': 
'http://what-a-strange.url/18'}},
+                 {'body': '{"executable":{"number":"1"}}', 'headers': {}}]
+            get_hook_mocked.return_value = hook_mock
+            operator = JenkinsJobTriggerOperator(
+                dag=None,
+                jenkins_connection_id="fake_jenkins_connection",
+                # The hook is mocked, this connection won't be used
+                task_id="operator_test",
+                job_name="a_job_on_jenkins",
+                parameters=the_parameters,
+                sleep_time=1)
+
+            operator.execute(None)
+
+            self.assertEquals(jenkins_mock.get_build_info.call_count, 1)
+            
jenkins_mock.get_build_info.assert_called_with(name='a_job_on_jenkins',
+                                                           number='1')
+
+    @unittest.skipIf(mock is None, 'mock package not present')
+    def test_execute_job_polling_loop(self):
+        jenkins_mock = mock.Mock(spec=jenkins.Jenkins, auth='secret')
+        jenkins_mock.get_job_info.return_value = {'nextBuildNumber': '1'}
+        jenkins_mock.get_build_info.side_effect = \
+            [{'result': None},
+             {'result': 'SUCCESS',
+              'url': 'http://aaa.fake-url.com/congratulation/its-a-job'}]
+        jenkins_mock.build_job_url.return_value = \
+            'http://www.jenkins.url/somewhere/in/the/universe'
+
+        hook_mock = mock.Mock(spec=JenkinsHook)
+        hook_mock.get_jenkins_server.return_value = jenkins_mock
+
+        the_parameters = {'a_param': 'blip', 'another_param': '42'}
+
+        with mock.patch.object(JenkinsJobTriggerOperator, "get_hook") as 
get_hook_mocked,\
+            mock.patch('airflow.contrib.operators.jenkins_job_trigger_operator'
+                       '.jenkins_request_with_headers') as mock_make_request:
+            mock_make_request.side_effect = \
+                [{'body': '', 'headers': {'Location': 
'http://what-a-strange.url/18'}},
+                 {'body': '{"executable":{"number":"1"}}', 'headers': {}}]
+            get_hook_mocked.return_value = hook_mock
+            operator = JenkinsJobTriggerOperator(
+                dag=None,
+                task_id="operator_test",
+                job_name="a_job_on_jenkins",
+                jenkins_connection_id="fake_jenkins_connection",
+                # The hook is mocked, this connection won't be used
+                parameters=the_parameters,
+                sleep_time=1)
+
+            operator.execute(None)
+            self.assertEquals(jenkins_mock.get_build_info.call_count, 2)
+
+    @unittest.skipIf(mock is None, 'mock package not present')
+    def test_execute_job_failure(self):
+        jenkins_mock = mock.Mock(spec=jenkins.Jenkins, auth='secret')
+        jenkins_mock.get_job_info.return_value = {'nextBuildNumber': '1'}
+        jenkins_mock.get_build_info.return_value = {
+            'result': 'FAILURE',
+            'url': 'http://aaa.fake-url.com/congratulation/its-a-job'}
+        jenkins_mock.build_job_url.return_value = \
+            'http://www.jenkins.url/somewhere/in/the/universe'
+
+        hook_mock = mock.Mock(spec=JenkinsHook)
+        hook_mock.get_jenkins_server.return_value = jenkins_mock
+
+        the_parameters = {'a_param': 'blip', 'another_param': '42'}
+
+        with mock.patch.object(JenkinsJobTriggerOperator, "get_hook") as 
get_hook_mocked,\
+            mock.patch('airflow.contrib.operators.'
+                       
'jenkins_job_trigger_operator.jenkins_request_with_headers') \
+                as mock_make_request:
+            mock_make_request.side_effect = \
+                [{'body': '', 'headers': {'Location': 
'http://what-a-strange.url/18'}},
+                 {'body': '{"executable":{"number":"1"}}', 'headers': {}}]
+            get_hook_mocked.return_value = hook_mock
+            operator = JenkinsJobTriggerOperator(
+                dag=None,
+                task_id="operator_test",
+                job_name="a_job_on_jenkins",
+                parameters=the_parameters,
+                jenkins_connection_id="fake_jenkins_connection",
+                # The hook is mocked, this connection won't be used
+                sleep_time=1)
+
+            self.assertRaises(AirflowException, operator.execute, None)
+
+
+if __name__ == "__main__":
+    unittest.main()

Reply via email to