Fokko closed pull request #4211: [AIRFLOW-3365][AIRFLOW-3366] Allow celery_broker_transport_options to be set with environment variables URL: https://github.com/apache/incubator-airflow/pull/4211
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/airflow/config_templates/default_celery.py b/airflow/config_templates/default_celery.py index d44f2b3448..5e72134de1 100644 --- a/airflow/config_templates/default_celery.py +++ b/airflow/config_templates/default_celery.py @@ -37,7 +37,7 @@ def _broker_supports_visibility_timeout(url): ) if 'visibility_timeout' not in broker_transport_options: if _broker_supports_visibility_timeout(broker_url): - broker_transport_options = {'visibility_timeout': 21600} + broker_transport_options['visibility_timeout'] = 21600 DEFAULT_CELERY_CONFIG = { 'accept_content': ['json', 'pickle'], diff --git a/airflow/configuration.py b/airflow/configuration.py index d07faf1cf8..3662df8d06 100644 --- a/airflow/configuration.py +++ b/airflow/configuration.py @@ -324,6 +324,12 @@ def getsection(self, section): if section in self._sections: _section.update(copy.deepcopy(self._sections[section])) + section_prefix = 'AIRFLOW__{S}__'.format(S=section.upper()) + for env_var in sorted(os.environ.keys()): + if env_var.startswith(section_prefix): + key = env_var.replace(section_prefix, '').lower() + _section[key] = self._get_env_var_option(section, key) + for key, val in iteritems(_section): try: val = int(val) diff --git a/tests/test_configuration.py b/tests/test_configuration.py index 09284c9972..acebd5732c 100644 --- a/tests/test_configuration.py +++ b/tests/test_configuration.py @@ -165,7 +165,7 @@ def test_getsection(self): key1 = awesome key2 = airflow -[another] +[testsection] key3 = value3 ''' test_conf = AirflowConfigParser( @@ -177,18 +177,18 @@ def test_getsection(self): test_conf.getsection('test') ) self.assertEqual( - OrderedDict([('key3', 'value3')]), - test_conf.getsection('another') + OrderedDict([ + ('key3', 'value3'), + ('testkey', 'testvalue'), + ('testpercent', 'with%percent')]), + test_conf.getsection('testsection') ) def test_broker_transport_options(self): section_dict = conf.getsection("celery_broker_transport_options") self.assertTrue(isinstance(section_dict['visibility_timeout'], int)) - self.assertTrue(isinstance(section_dict['_test_only_bool'], bool)) - self.assertTrue(isinstance(section_dict['_test_only_float'], float)) - self.assertTrue(isinstance(section_dict['_test_only_string'], six.string_types)) def test_deprecated_options(self): ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services