Fokko closed pull request #4119: [AIRFLOW-2780] Add IMAP Hook to retrieve email 
attachments
URL: https://github.com/apache/incubator-airflow/pull/4119
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/contrib/hooks/imap_hook.py 
b/airflow/contrib/hooks/imap_hook.py
new file mode 100644
index 0000000000..c0b3126a8f
--- /dev/null
+++ b/airflow/contrib/hooks/imap_hook.py
@@ -0,0 +1,280 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import email
+import imaplib
+import os
+import re
+
+from airflow import LoggingMixin
+from airflow.hooks.base_hook import BaseHook
+
+
+class ImapHook(BaseHook):
+    """
+    This hook connects to a mail server by using the imap protocol.
+
+    :param imap_conn_id: The connection id that contains the information
+                         used to authenticate the client.
+                         The default value is 'imap_default'.
+    :type imap_conn_id: str
+    """
+
+    def __init__(self, imap_conn_id='imap_default'):
+        super(ImapHook, self).__init__(imap_conn_id)
+        self.conn = self.get_connection(imap_conn_id)
+        self.mail_client = imaplib.IMAP4_SSL(self.conn.host)
+
+    def __enter__(self):
+        self.mail_client.login(self.conn.login, self.conn.password)
+        return self
+
+    def __exit__(self, exc_type, exc_val, exc_tb):
+        self.mail_client.logout()
+
+    def has_mail_attachment(self, name, mail_folder='INBOX', 
check_regex=False):
+        """
+        Checks the mail folder for mails containing attachments with the given 
name.
+
+        :param name: The name of the attachment that will be searched for.
+        :type name: str
+        :param mail_folder: The mail folder where to look at.
+                            The default value is 'INBOX'.
+        :type mail_folder: str
+        :param check_regex: Checks the name for a regular expression.
+                            The default value is False.
+        :type check_regex: bool
+        :returns: True if there is an attachment with the given name and False 
if not.
+        :rtype: bool
+        """
+        mail_attachments = self._retrieve_mails_attachments_by_name(name, 
mail_folder,
+                                                                    
check_regex,
+                                                                    
latest_only=True)
+        return len(mail_attachments) > 0
+
+    def retrieve_mail_attachments(self, name, mail_folder='INBOX', 
check_regex=False,
+                                  latest_only=False):
+        """
+        Retrieves mail's attachments in the mail folder by its name.
+
+        :param name: The name of the attachment that will be downloaded.
+        :type name: str
+        :param mail_folder: The mail folder where to look at.
+                            The default value is 'INBOX'.
+        :type mail_folder: str
+        :param check_regex: Checks the name for a regular expression.
+                            The default value is False.
+        :type check_regex: bool
+        :param latest_only: If set to True it will only retrieve
+                            the first matched attachment.
+                            The default value is False.
+        :type latest_only: bool
+        :returns: a list of tuple each containing the attachment filename and 
its payload.
+        :rtype: a list of tuple
+        """
+        mail_attachments = self._retrieve_mails_attachments_by_name(name, 
mail_folder,
+                                                                    
check_regex,
+                                                                    
latest_only)
+        return mail_attachments
+
+    def download_mail_attachments(self, name, local_output_directory, 
mail_folder='INBOX',
+                                  check_regex=False, latest_only=False):
+        """
+        Downloads mail's attachments in the mail folder by its name
+        to the local directory.
+
+        :param name: The name of the attachment that will be downloaded.
+        :type name: str
+        :param local_output_directory: The output directory on the local 
machine
+                                       where the files will be downloaded to.
+        :type local_output_directory: str
+        :param mail_folder: The mail folder where to look at.
+                            The default value is 'INBOX'.
+        :type mail_folder: str
+        :param check_regex: Checks the name for a regular expression.
+                            The default value is False.
+        :type check_regex: bool
+        :param latest_only: If set to True it will only download
+                            the first matched attachment.
+                            The default value is False.
+        :type latest_only: bool
+        """
+        mail_attachments = self._retrieve_mails_attachments_by_name(name, 
mail_folder,
+                                                                    
check_regex, latest_only)
+        self._create_files(mail_attachments, local_output_directory)
+
+    def _retrieve_mails_attachments_by_name(self, name, mail_folder, 
check_regex,
+                                            latest_only):
+        all_matching_attachments = []
+
+        self.mail_client.select(mail_folder)
+
+        for mail_id in self._list_mail_ids_desc():
+            response_mail_body = self._fetch_mail_body(mail_id)
+            matching_attachments = self._check_mail_body(response_mail_body, 
name, check_regex, latest_only)
+
+            if matching_attachments:
+                all_matching_attachments.extend(matching_attachments)
+                if latest_only:
+                    break
+
+        self.mail_client.close()
+
+        return all_matching_attachments
+
+    def _list_mail_ids_desc(self):
+        result, data = self.mail_client.search(None, 'All')
+        mail_ids = data[0].split()
+        return reversed(mail_ids)
+
+    def _fetch_mail_body(self, mail_id):
+        result, data = self.mail_client.fetch(mail_id, '(RFC822)')
+        mail_body = data[0][1]  # The mail body is always in this specific 
location
+        mail_body_str = mail_body.decode('utf-8')
+        return mail_body_str
+
+    def _check_mail_body(self, response_mail_body, name, check_regex, 
latest_only):
+        mail = Mail(response_mail_body)
+        if mail.has_attachments():
+            return mail.get_attachments_by_name(name, check_regex, 
find_first=latest_only)
+
+    def _create_files(self, mail_attachments, local_output_directory):
+        for name, payload in mail_attachments:
+            if self._is_symlink(name):
+                self.log.error('Can not create file because it is a symlink!')
+            elif self._is_escaping_current_directory(name):
+                self.log.error('Can not create file because it is escaping the 
current directory!')
+            else:
+                self._create_file(name, payload, local_output_directory)
+
+    def _is_symlink(self, name):
+        return os.path.islink(name)
+
+    def _is_escaping_current_directory(self, name):
+        return '../' in name
+
+    def _correct_path(self, name, local_output_directory):
+        return local_output_directory + name if 
local_output_directory.endswith('/') \
+            else local_output_directory + '/' + name
+
+    def _create_file(self, name, payload, local_output_directory):
+        file_path = self._correct_path(name, local_output_directory)
+
+        with open(file_path, 'wb') as file:
+            file.write(payload)
+
+
+class Mail(LoggingMixin):
+    """
+    This class simplifies working with mails returned by the imaplib client.
+
+    :param mail_body: The mail body of a mail received from imaplib client.
+    :type mail_body: str
+    """
+
+    def __init__(self, mail_body):
+        super(Mail, self).__init__()
+        self.mail = email.message_from_string(mail_body)
+
+    def has_attachments(self):
+        """
+        Checks the mail for a attachments.
+
+        :returns: True if it has attachments and False if not.
+        :rtype: bool
+        """
+        return self.mail.get_content_maintype() == 'multipart'
+
+    def get_attachments_by_name(self, name, check_regex, find_first=False):
+        """
+        Gets all attachments by name for the mail.
+
+        :param name: The name of the attachment to look for.
+        :type name: str
+        :param check_regex: Checks the name for a regular expression.
+        :type check_regex: bool
+        :param find_first: If set to True it will only find the first match 
and then quit.
+                           The default value is False.
+        :type find_first: bool
+        :returns: a list of tuples each containing name and payload
+                  where the attachments name matches the given name.
+        :rtype: list of tuple
+        """
+        attachments = []
+
+        for part in self.mail.walk():
+            mail_part = MailPart(part)
+            if mail_part.is_attachment():
+                found_attachment = mail_part.has_matching_name(name) if 
check_regex \
+                    else mail_part.has_equal_name(name)
+                if found_attachment:
+                    file_name, file_payload = mail_part.get_file()
+                    self.log.info('Found attachment: {}'.format(file_name))
+                    attachments.append((file_name, file_payload))
+                    if find_first:
+                        break
+
+        return attachments
+
+
+class MailPart:
+    """
+    This class is a wrapper for a Mail object's part and gives it more 
features.
+
+    :param part: The mail part in a Mail object.
+    :type part: any
+    """
+
+    def __init__(self, part):
+        self.part = part
+
+    def is_attachment(self):
+        """
+        Checks if the part is a valid mail attachment.
+
+        :returns: True if it is an attachment and False if not.
+        :rtype: bool
+        """
+        return self.part.get_content_maintype() != 'multipart' and 
self.part.get('Content-Disposition')
+
+    def has_matching_name(self, name):
+        """
+        Checks if the given name matches the part's name.
+
+        :param name: The name to look for.
+        :type name: str
+        :returns: True if it matches the name (including regular expression).
+        :rtype: tuple
+        """
+        return re.match(name, self.part.get_filename())
+
+    def has_equal_name(self, name):
+        """
+        Checks if the given name is equal to the part's name.
+
+        :param name: The name to look for.
+        :type name: str
+        :returns: True if it is equal to the given name.
+        :rtype: bool
+        """
+        return self.part.get_filename() == name
+
+    def get_file(self):
+        """
+        Gets the file including name and payload.
+
+        :returns: the part's name and payload.
+        :rtype: tuple
+        """
+        return self.part.get_filename(), self.part.get_payload(decode=True)
diff --git a/docs/code.rst b/docs/code.rst
index 211e1abafe..02366e728b 100644
--- a/docs/code.rst
+++ b/docs/code.rst
@@ -399,6 +399,7 @@ Community contributed hooks
 .. autoclass:: airflow.contrib.hooks.gcp_mlengine_hook.MLEngineHook
 .. autoclass:: airflow.contrib.hooks.gcp_pubsub_hook.PubSubHook
 .. autoclass:: airflow.contrib.hooks.gcs_hook.GoogleCloudStorageHook
+.. autoclass:: airflow.contrib.hooks.imap_hook.ImapHook
 .. autoclass:: airflow.contrib.hooks.jenkins_hook.JenkinsHook
 .. autoclass:: airflow.contrib.hooks.jira_hook.JiraHook
 .. autoclass:: airflow.contrib.hooks.mongo_hook.MongoHook
diff --git a/tests/contrib/hooks/test_imap_hook.py 
b/tests/contrib/hooks/test_imap_hook.py
new file mode 100644
index 0000000000..63dc13a874
--- /dev/null
+++ b/tests/contrib/hooks/test_imap_hook.py
@@ -0,0 +1,266 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import imaplib
+import unittest
+
+from mock import Mock, patch, mock_open
+
+from airflow import configuration, models
+from airflow.contrib.hooks.imap_hook import ImapHook
+from airflow.utils import db
+
+imaplib_string = 'airflow.contrib.hooks.imap_hook.imaplib'
+open_string = 'airflow.contrib.hooks.imap_hook.open'
+
+
+def _create_fake_imap(mock_imaplib, with_mail=False, 
attachment_name='test1.csv'):
+    mock_conn = Mock(spec=imaplib.IMAP4_SSL)
+    mock_imaplib.IMAP4_SSL.return_value = mock_conn
+
+    mock_conn.login.return_value = ('OK', [])
+
+    if with_mail:
+        mock_conn.select.return_value = ('OK', [])
+        mock_conn.search.return_value = ('OK', [b'1'])
+        mail_string = \
+            'Content-Type: multipart/mixed; boundary=123\r\n--123\r\n' \
+            'Content-Disposition: attachment; filename="{}";' \
+            'Content-Transfer-Encoding: 
base64\r\nSWQsTmFtZQoxLEZlbGl4\r\n--123--'.format(attachment_name)
+        mock_conn.fetch.return_value = ('OK', [(b'', 
mail_string.encode('utf-8'))])
+        mock_conn.close.return_value = ('OK', [])
+
+    mock_conn.logout.return_value = ('OK', [])
+
+    return mock_conn
+
+
+class TestImapHook(unittest.TestCase):
+    def setUp(self):
+        configuration.load_test_config()
+
+        db.merge_conn(
+            models.Connection(
+                conn_id='imap_default',
+                host='imap_server_address',
+                login='imap_user',
+                password='imap_password'
+            )
+        )
+
+    @patch(imaplib_string)
+    def test_connect_and_disconnect(self, mock_imaplib):
+        mock_conn = _create_fake_imap(mock_imaplib)
+
+        with ImapHook():
+            pass
+
+        mock_imaplib.IMAP4_SSL.assert_called_once_with('imap_server_address')
+        mock_conn.login.assert_called_once_with('imap_user', 'imap_password')
+        mock_conn.logout.assert_called_once()
+
+    @patch(imaplib_string)
+    def test_has_mail_attachments_found(self, mock_imaplib):
+        _create_fake_imap(mock_imaplib, with_mail=True)
+
+        with ImapHook() as imap_hook:
+            has_attachment_in_inbox = 
imap_hook.has_mail_attachment('test1.csv')
+
+        self.assertTrue(has_attachment_in_inbox)
+
+    @patch(imaplib_string)
+    def test_has_mail_attachments_not_found(self, mock_imaplib):
+        _create_fake_imap(mock_imaplib, with_mail=True)
+
+        with ImapHook() as imap_hook:
+            has_attachment_in_inbox = 
imap_hook.has_mail_attachment('test1.txt')
+
+        self.assertFalse(has_attachment_in_inbox)
+
+    @patch(imaplib_string)
+    def test_has_mail_attachments_with_regex_found(self, mock_imaplib):
+        _create_fake_imap(mock_imaplib, with_mail=True)
+
+        with ImapHook() as imap_hook:
+            has_attachment_in_inbox = imap_hook.has_mail_attachment(
+                name='test(\d+).csv',
+                check_regex=True
+            )
+
+        self.assertTrue(has_attachment_in_inbox)
+
+    @patch(imaplib_string)
+    def test_has_mail_attachments_with_regex_not_found(self, mock_imaplib):
+        _create_fake_imap(mock_imaplib, with_mail=True)
+
+        with ImapHook() as imap_hook:
+            has_attachment_in_inbox = imap_hook.has_mail_attachment(
+                name='test_(\d+).csv',
+                check_regex=True
+            )
+
+        self.assertFalse(has_attachment_in_inbox)
+
+    @patch(imaplib_string)
+    def test_retrieve_mail_attachments_found(self, mock_imaplib):
+        _create_fake_imap(mock_imaplib, with_mail=True)
+
+        with ImapHook() as imap_hook:
+            attachments_in_inbox = 
imap_hook.retrieve_mail_attachments('test1.csv')
+
+        self.assertEquals(attachments_in_inbox, [('test1.csv', 
b'SWQsTmFtZQoxLEZlbGl4')])
+
+    @patch(imaplib_string)
+    def test_retrieve_mail_attachments_not_found(self, mock_imaplib):
+        _create_fake_imap(mock_imaplib, with_mail=True)
+
+        with ImapHook() as imap_hook:
+            attachments_in_inbox = 
imap_hook.retrieve_mail_attachments('test1.txt')
+
+        self.assertEquals(attachments_in_inbox, [])
+
+    @patch(imaplib_string)
+    def test_retrieve_mail_attachments_with_regex_found(self, mock_imaplib):
+        _create_fake_imap(mock_imaplib, with_mail=True)
+
+        with ImapHook() as imap_hook:
+            attachments_in_inbox = imap_hook.retrieve_mail_attachments(
+                name='test(\d+).csv',
+                check_regex=True
+            )
+
+        self.assertEquals(attachments_in_inbox, [('test1.csv', 
b'SWQsTmFtZQoxLEZlbGl4')])
+
+    @patch(imaplib_string)
+    def test_retrieve_mail_attachments_with_regex_not_found(self, 
mock_imaplib):
+        _create_fake_imap(mock_imaplib, with_mail=True)
+
+        with ImapHook() as imap_hook:
+            attachments_in_inbox = imap_hook.retrieve_mail_attachments(
+                name='test_(\d+).csv',
+                check_regex=True
+            )
+
+        self.assertEquals(attachments_in_inbox, [])
+
+    @patch(imaplib_string)
+    def test_retrieve_mail_attachments_latest_only(self, mock_imaplib):
+        _create_fake_imap(mock_imaplib, with_mail=True)
+
+        with ImapHook() as imap_hook:
+            attachments_in_inbox = imap_hook.retrieve_mail_attachments(
+                name='test1.csv',
+                latest_only=True
+            )
+
+        self.assertEquals(attachments_in_inbox, [('test1.csv', 
b'SWQsTmFtZQoxLEZlbGl4')])
+
+    @patch(open_string, new_callable=mock_open)
+    @patch(imaplib_string)
+    def test_download_mail_attachments_found(self, mock_imaplib, 
mock_open_method):
+        _create_fake_imap(mock_imaplib, with_mail=True)
+
+        with ImapHook() as imap_hook:
+            imap_hook.download_mail_attachments('test1.csv', 'test_directory')
+
+        mock_open_method.assert_called_once_with('test_directory/test1.csv', 
'wb')
+        
mock_open_method().write.assert_called_once_with(b'SWQsTmFtZQoxLEZlbGl4')
+
+    @patch(open_string, new_callable=mock_open)
+    @patch(imaplib_string)
+    def test_download_mail_attachments_not_found(self, mock_imaplib, 
mock_open_method):
+        _create_fake_imap(mock_imaplib, with_mail=True)
+
+        with ImapHook() as imap_hook:
+            imap_hook.download_mail_attachments('test1.txt', 'test_directory')
+
+        mock_open_method.assert_not_called()
+        mock_open_method().write.assert_not_called()
+
+    @patch(open_string, new_callable=mock_open)
+    @patch(imaplib_string)
+    def test_download_mail_attachments_with_regex_found(self, mock_imaplib, 
mock_open_method):
+        _create_fake_imap(mock_imaplib, with_mail=True)
+
+        with ImapHook() as imap_hook:
+            imap_hook.download_mail_attachments(
+                name='test(\d+).csv',
+                local_output_directory='test_directory',
+                check_regex=True
+            )
+
+        mock_open_method.assert_called_once_with('test_directory/test1.csv', 
'wb')
+        
mock_open_method().write.assert_called_once_with(b'SWQsTmFtZQoxLEZlbGl4')
+
+    @patch(open_string, new_callable=mock_open)
+    @patch(imaplib_string)
+    def test_download_mail_attachments_with_regex_not_found(self, 
mock_imaplib, mock_open_method):
+        _create_fake_imap(mock_imaplib, with_mail=True)
+
+        with ImapHook() as imap_hook:
+            imap_hook.download_mail_attachments(
+                name='test_(\d+).csv',
+                local_output_directory='test_directory',
+                check_regex=True
+            )
+
+        mock_open_method.assert_not_called()
+        mock_open_method().write.assert_not_called()
+
+    @patch(open_string, new_callable=mock_open)
+    @patch(imaplib_string)
+    def test_download_mail_attachments_with_latest_only(self, mock_imaplib, 
mock_open_method):
+        _create_fake_imap(mock_imaplib, with_mail=True)
+
+        with ImapHook() as imap_hook:
+            imap_hook.download_mail_attachments(
+                name='test1.csv',
+                local_output_directory='test_directory',
+                latest_only=True
+            )
+
+        mock_open_method.assert_called_once_with('test_directory/test1.csv', 
'wb')
+        
mock_open_method().write.assert_called_once_with(b'SWQsTmFtZQoxLEZlbGl4')
+
+    @patch(open_string, new_callable=mock_open)
+    @patch(imaplib_string)
+    def test_download_mail_attachments_with_escaping_chars(self, mock_imaplib, 
mock_open_method):
+        _create_fake_imap(mock_imaplib, with_mail=True, 
attachment_name='../test1.csv')
+
+        with ImapHook() as imap_hook:
+            imap_hook.download_mail_attachments(
+                name='test1.csv',
+                local_output_directory='test_directory'
+            )
+
+        mock_open_method.assert_not_called()
+        mock_open_method().write.assert_not_called()
+
+    @patch(open_string, new_callable=mock_open)
+    @patch(imaplib_string)
+    def test_download_mail_attachments_with_symlink(self, mock_imaplib, 
mock_open_method):
+        _create_fake_imap(mock_imaplib, with_mail=True, 
attachment_name='initrd.img')
+
+        with ImapHook() as imap_hook:
+            imap_hook.download_mail_attachments(
+                name='test1.csv',
+                local_output_directory='test_directory'
+            )
+
+        mock_open_method.assert_not_called()
+        mock_open_method().write.assert_not_called()
+
+
+if __name__ == '__main__':
+    unittest.main()


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to