Repository: incubator-airflow Updated Branches: refs/heads/v1-10-test 683f46688 -> bd4937b56
[AIRFLOW-2668] Handle missing optional cryptography dependency cryptography is a recommended, but optional dependency. It was mistakenly made a hard dependency by a refactor. This restores that behaviour (though without tests, as it's hard to test that in a unittest) In testing this I found that running `airflow initdb` would end up printing the "crypto is missing" message 15 times, making it hard to see what was actually going on. So I have re-worked `get_fernet()` to only compute (and warn) once. This also makes the consuming code easier as in the case of the dep not being installed we still have a class that presents the same interface as Fernet. Closes #3550 from ashb/AIRFLOW-2668-optional- cryptography (cherry picked from commit fa6c35743a5129057d870752554b0827dd7d462f) Signed-off-by: Bolke de Bruin <bo...@xs4all.nl> Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/bd4937b5 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/bd4937b5 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/bd4937b5 Branch: refs/heads/v1-10-test Commit: bd4937b56efcffb117f266d1a8c523c82e023e18 Parents: 683f466 Author: Ash Berlin-Taylor <ash_git...@firemirror.com> Authored: Wed Jun 27 22:11:43 2018 +0200 Committer: Bolke de Bruin <bo...@xs4all.nl> Committed: Wed Jun 27 22:13:03 2018 +0200 ---------------------------------------------------------------------- airflow/models.py | 111 ++++++++++++++++++++++++++++--------------------- 1 file changed, 63 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/bd4937b5/airflow/models.py ---------------------------------------------------------------------- diff --git a/airflow/models.py b/airflow/models.py index 4706c2d..4b7e123 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -24,11 +24,9 @@ from __future__ import unicode_literals from future.standard_library import install_aliases -from builtins import str -from builtins import object, bytes +from builtins import str, object, bytes, ImportError as BuiltinImportError import copy from collections import namedtuple, defaultdict -import cryptography from datetime import timedelta import dill @@ -104,6 +102,33 @@ XCOM_RETURN_KEY = 'return_value' Stats = settings.Stats +class InvalidFernetToken(Exception): + # If Fernet isn't loaded we need a valid exception class to catch. If it is + # loaded this will get reset to the actual class once get_fernet() is called + pass + + +class NullFernet(object): + """ + A "Null" encryptor class that doesn't encrypt or decrypt but that presents + a similar interface to Fernet. + + The purpose of this is to make the rest of the code not have to know the + difference, and to only display the message once, not 20 times when + `airflow initdb` is ran. + """ + is_encrypted = False + + def decrpyt(self, b): + return b + + def encrypt(self, b): + return b + + +_fernet = None + + def get_fernet(): """ Deferred load of Fernet key. @@ -114,12 +139,25 @@ def get_fernet(): :return: Fernet object :raises: AirflowException if there's a problem trying to load Fernet """ + global _fernet + if _fernet: + return _fernet try: - from cryptography.fernet import Fernet - except ImportError: - raise AirflowException('Failed to import Fernet, it may not be installed') + from cryptography.fernet import Fernet, InvalidToken + global InvalidFernetToken + InvalidFernetToken = InvalidToken + + except BuiltinImportError: + LoggingMixin().log.warn("cryptography not found - values will not be stored " + "encrypted.", + exc_info=1) + _fernet = NullFernet() + return _fernet + try: - return Fernet(configuration.conf.get('core', 'FERNET_KEY').encode('utf-8')) + _fernet = Fernet(configuration.conf.get('core', 'FERNET_KEY').encode('utf-8')) + _fernet.is_encrypted = True + return _fernet except (ValueError, TypeError) as ve: raise AirflowException("Could not create Fernet object: {}".format(ve)) @@ -673,9 +711,8 @@ class Connection(Base, LoggingMixin): def get_password(self): if self._password and self.is_encrypted: - try: - fernet = get_fernet() - except AirflowException: + fernet = get_fernet() + if not fernet.is_encrypted: raise AirflowException( "Can't decrypt encrypted password for login={}, \ FERNET_KEY configuration is missing".format(self.login)) @@ -685,15 +722,9 @@ class Connection(Base, LoggingMixin): def set_password(self, value): if value: - try: - fernet = get_fernet() - self._password = fernet.encrypt(bytes(value, 'utf-8')).decode() - self.is_encrypted = True - except AirflowException: - self.log.exception("Failed to load fernet while encrypting value, " - "using non-encrypted value.") - self._password = value - self.is_encrypted = False + fernet = get_fernet() + self._password = fernet.encrypt(bytes(value, 'utf-8')).decode() + self.is_encrypted = fernet.is_encrypted @declared_attr def password(cls): @@ -702,9 +733,8 @@ class Connection(Base, LoggingMixin): def get_extra(self): if self._extra and self.is_extra_encrypted: - try: - fernet = get_fernet() - except AirflowException: + fernet = get_fernet() + if not fernet.is_encrypted: raise AirflowException( "Can't decrypt `extra` params for login={},\ FERNET_KEY configuration is missing".format(self.login)) @@ -714,15 +744,9 @@ class Connection(Base, LoggingMixin): def set_extra(self, value): if value: - try: - fernet = get_fernet() - self._extra = fernet.encrypt(bytes(value, 'utf-8')).decode() - self.is_extra_encrypted = True - except AirflowException: - self.log.exception("Failed to load fernet while encrypting value, " - "using non-encrypted value.") - self._extra = value - self.is_extra_encrypted = False + fernet = get_fernet() + self._extra = fernet.encrypt(bytes(value, 'utf-8')).decode() + self.is_extra_encrypted = fernet.is_encrypted else: self._extra = value self.is_extra_encrypted = False @@ -4366,32 +4390,23 @@ class Variable(Base, LoggingMixin): if self._val and self.is_encrypted: try: fernet = get_fernet() - except Exception: - log.error("Can't decrypt _val for key={}, FERNET_KEY " - "configuration missing".format(self.key)) - return None - try: return fernet.decrypt(bytes(self._val, 'utf-8')).decode() - except cryptography.fernet.InvalidToken: + except InvalidFernetToken: log.error("Can't decrypt _val for key={}, invalid token " "or value".format(self.key)) return None + except Exception: + log.error("Can't decrypt _val for key={}, FERNET_KEY " + "configuration missing".format(self.key)) + return None else: return self._val def set_val(self, value): if value: - try: - fernet = get_fernet() - self._val = fernet.encrypt(bytes(value, 'utf-8')).decode() - self.is_encrypted = True - except AirflowException: - self.log.exception( - "Failed to load fernet while encrypting value, " - "using non-encrypted value." - ) - self._val = value - self.is_encrypted = False + fernet = get_fernet() + self._val = fernet.encrypt(bytes(value, 'utf-8')).decode() + self.is_encrypted = fernet.is_encrypted @declared_attr def val(cls):