To use with the SQLA backend to celery you need to override the options Airflow passes to Celery. Those come from https://github.com/apache/incubator-airflow/blob/v1-10-test/airflow/config_templates/default_celery.py
Since you don't want most/all of those options (and there is no way in the config file to _remove_ a setting) you will have to point airflow to a different file for the celery config: This line in the config is what you will need to change: # Import path for celery configuration options celery_config_options = airflow.config_templates.default_celery.DEFAULT_CELERY_CONFIG If you create something like config/celery_config.py containing: CELERY_CONFIG = { # Just the options you want to set } (config/ should exist along side your dags/ folder, and I think it should be added to the python path already). You can then set this in the config: celery_config_options = celery_config.CELERY_CONFIG That should give you complete control > On 21 May 2018, at 09:50, Craig Rodrigues <crodr...@gmail.com> wrote: > > Hi, > > I used this requirements.txt file to install airflow from the v1-10-test > branch: > > git+https://github.com/celery/celery@master#egg=celery > git+https://github.com/apache/incubator-airflow@v1-10-test#egg=apache-airflow[celery,crypto,emr,hive,hdfs,ldap,mysql,postgres,redis,slack,s3] > kombu>=4.1.0 > > > In my airflow.cfg, I have: > > [celery] > executor = CeleryExecutor > > executor = CeleryExec > broker_url = sqla+mysql://airflow:blah@localhost:3306/mydb > > [celery_broker_transport_options] > # > # > > However, if I manually run this code inside the webserver, I see: > > python -c "from airflow import configuration; c = > configuration.conf.getsection('celery_broker_transport_options'); print(c)" > OrderedDict([(u'visibility_timeout', 21600), (u'ssl_active', False), > (u'ssl_key', u''), (u'ssl_cert', u''), (u'ssl_cacert', u'')]) > > My worker crashes with this error: > > > [2018-05-21 07:46:12,406] {configuration.py:212} WARNING - section/key > [celery/ssl_active] not found in config > [2018-05-21 07:46:12,407] {default_celery.py:51} WARNING - Celery Executor > will run without SSL > [2018-05-21 07:46:12,411] {__init__.py:48} INFO - Using executor > CeleryExecutor > [2018-05-21 07:46:13,086: CRITICAL/MainProcess] Unrecoverable error: > TypeError(u"Invalid argument(s) > 'ssl_key','ssl_cert','ssl_active','visibility_timeout','ssl_cacert' sent to > create_engine(), using configuration MySQLDialect_mysqldb/QueuePool/Engine. > Please check that the keyword arguments are appropriate for this combination > of components.",) > Traceback (most recent call last): > File "/usr/lib/python2.7/site-packages/celery/worker/worker.py", line 205, > in start > self.blueprint.start(self) > File "/usr/lib/python2.7/site-packages/celery/bootsteps.py", line 119, in > start > step.start(parent) > File "/usr/lib/python2.7/site-packages/celery/bootsteps.py", line 369, in > start > return self.obj.start() > File "/usr/lib/python2.7/site-packages/celery/worker/consumer/consumer.py", > line 322, in start > blueprint.start(self) > File "/usr/lib/python2.7/site-packages/celery/bootsteps.py", line 119, in > start > step.start(parent) > File "/usr/lib/python2.7/site-packages/celery/worker/consumer/tasks.py", > line 41, in start > c.connection, on_decode_error=c.on_decode_error, > File "/usr/lib/python2.7/site-packages/celery/app/amqp.py", line 297, in > TaskConsumer > **kw > File "/usr/lib/python2.7/site-packages/kombu/messaging.py", line 386, in > __init__ > self.revive(self.channel) > File "/usr/lib/python2.7/site-packages/kombu/messaging.py", line 408, in > revive > self.declare() > File "/usr/lib/python2.7/site-packages/kombu/messaging.py", line 421, in > declare > queue.declare() > File "/usr/lib/python2.7/site-packages/kombu/entity.py", line 605, in declare > self._create_queue(nowait=nowait, channel=channel) > File "/usr/lib/python2.7/site-packages/kombu/entity.py", line 614, in > _create_queue > self.queue_declare(nowait=nowait, passive=False, channel=channel) > File "/usr/lib/python2.7/site-packages/kombu/entity.py", line 649, in > queue_declare > nowait=nowait, > File "/usr/lib/python2.7/site-packages/kombu/transport/virtual/base.py", > line 531, in queue_declare > self._new_queue(queue, **kwargs) > File > "/usr/lib/python2.7/site-packages/kombu/transport/sqlalchemy/__init__.py", > line 82, in _new_queue > self._get_or_create(queue) > File > "/usr/lib/python2.7/site-packages/kombu/transport/sqlalchemy/__init__.py", > line 70, in _get_or_create > obj = self.session.query(self.queue_cls) \ > File > "/usr/lib/python2.7/site-packages/kombu/transport/sqlalchemy/__init__.py", > line 65, in session > _, Session = self._open() > File > "/usr/lib/python2.7/site-packages/kombu/transport/sqlalchemy/__init__.py", > line 56, in _open > engine = self._engine_from_config() > File > "/usr/lib/python2.7/site-packages/kombu/transport/sqlalchemy/__init__.py", > line 51, in _engine_from_config > return create_engine(conninfo.hostname, **transport_options) > File "/usr/lib64/python2.7/site-packages/sqlalchemy/engine/__init__.py", > line 391, in create_engine > return strategy.create(*args, **kwargs) > File "/usr/lib64/python2.7/site-packages/sqlalchemy/engine/strategies.py", > line 160, in create > engineclass.__name__)) > TypeError: Invalid argument(s) > 'ssl_key','ssl_cert','ssl_active','visibility_timeout','ssl_cacert' sent to > create_engine(), using configuration MySQLDialect_mysqldb/QueuePool/Engine. > Please check that the keyword arguments are appropriate for this combination > of components. > > -------------- celery@qa1 v4.2.0rc3 (windowlicker) > ---- **** ----- > --- * *** * -- Linux-4.13.0-16-generic-x86_64-with-centos-7.3.1611-Core > 2018-05-21 07:46:12 > -- * - **** --- > - ** ---------- [config] > - ** ---------- .> app: airflow.executors.celery_executor:0x4766d50 > - ** ---------- .> transport: sqla+mysql://airflow:blah@localhost:3306/mydb > - ** ---------- .> results: mysql://airflow:**@localhost:3306/airflow > - *** --- * --- .> concurrency: 16 (prefork) > -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this > worker) > --- ***** ----- > -------------- [queues] > .> airflow_celery exchange=airflow_celery(direct) > key=airflow_celery > > > > What is the correct way to override the celery_broker_transport_options? > I thought that having an empty section in airflow.cfg would be enough? > > I thought that this was fixed with: > https://github.com/apache/incubator-airflow/pull/2842 > > > I cannot pass visibilty_timeout or ssl_key to a mysql backend. > -- > Craig > > > > > > >