Fokko closed pull request #4211: [AIRFLOW-3365][AIRFLOW-3366] Allow 
celery_broker_transport_options to be set with environment variables
URL: https://github.com/apache/incubator-airflow/pull/4211
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/config_templates/default_celery.py 
b/airflow/config_templates/default_celery.py
index d44f2b3448..5e72134de1 100644
--- a/airflow/config_templates/default_celery.py
+++ b/airflow/config_templates/default_celery.py
@@ -37,7 +37,7 @@ def _broker_supports_visibility_timeout(url):
 )
 if 'visibility_timeout' not in broker_transport_options:
     if _broker_supports_visibility_timeout(broker_url):
-        broker_transport_options = {'visibility_timeout': 21600}
+        broker_transport_options['visibility_timeout'] = 21600
 
 DEFAULT_CELERY_CONFIG = {
     'accept_content': ['json', 'pickle'],
diff --git a/airflow/configuration.py b/airflow/configuration.py
index d07faf1cf8..3662df8d06 100644
--- a/airflow/configuration.py
+++ b/airflow/configuration.py
@@ -324,6 +324,12 @@ def getsection(self, section):
         if section in self._sections:
             _section.update(copy.deepcopy(self._sections[section]))
 
+        section_prefix = 'AIRFLOW__{S}__'.format(S=section.upper())
+        for env_var in sorted(os.environ.keys()):
+            if env_var.startswith(section_prefix):
+                key = env_var.replace(section_prefix, '').lower()
+                _section[key] = self._get_env_var_option(section, key)
+
         for key, val in iteritems(_section):
             try:
                 val = int(val)
diff --git a/tests/test_configuration.py b/tests/test_configuration.py
index 09284c9972..acebd5732c 100644
--- a/tests/test_configuration.py
+++ b/tests/test_configuration.py
@@ -165,7 +165,7 @@ def test_getsection(self):
 key1 = awesome
 key2 = airflow
 
-[another]
+[testsection]
 key3 = value3
 '''
         test_conf = AirflowConfigParser(
@@ -177,18 +177,18 @@ def test_getsection(self):
             test_conf.getsection('test')
         )
         self.assertEqual(
-            OrderedDict([('key3', 'value3')]),
-            test_conf.getsection('another')
+            OrderedDict([
+                ('key3', 'value3'),
+                ('testkey', 'testvalue'),
+                ('testpercent', 'with%percent')]),
+            test_conf.getsection('testsection')
         )
 
     def test_broker_transport_options(self):
         section_dict = conf.getsection("celery_broker_transport_options")
         self.assertTrue(isinstance(section_dict['visibility_timeout'], int))
-
         self.assertTrue(isinstance(section_dict['_test_only_bool'], bool))
-
         self.assertTrue(isinstance(section_dict['_test_only_float'], float))
-
         self.assertTrue(isinstance(section_dict['_test_only_string'], 
six.string_types))
 
     def test_deprecated_options(self):


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to