[ https://issues.apache.org/jira/browse/AIRFLOW-2519?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Craig Rodrigues updated AIRFLOW-2519: ------------------------------------- Description: Hi, I used this requirements.txt file to install airflow from the v1-10-test branch: git+[https://github.com/celery/celery@master#egg=celery] git+[https://github.com/apache/incubator-airflow@v1-10-test#egg=apache-airflow[celery,crypto,emr,hive,hdfs,ldap,mysql,postgres,redis,slack,s3]] kombu>=4.1.0 In my airflow.cfg, I have: [celery] executor = CeleryExecutor executor = CeleryExec broker_url = sqla+mysql://airflow:blah@localhost:3306/mydb [celery_broker_transport_options] # # However, if I manually run this code inside the webserver, I see: python -c "from airflow import configuration; c = configuration.conf.getsection('celery_broker_transport_options'); print(c)" OrderedDict([(u'visibility_timeout', 21600), (u'ssl_active', False), (u'ssl_key', u''), (u'ssl_cert', u''), (u'ssl_cacert', u'')]) My worker crashes with this error: [2018-05-21 07:46:12,406] \{configuration.py:212} WARNING - section/key [celery/ssl_active] not found in config [2018-05-21 07:46:12,407] \{default_celery.py:51} WARNING - Celery Executor will run without SSL [2018-05-21 07:46:12,411] \{__init__.py:48} INFO - Using executor CeleryExecutor [2018-05-21 07:46:13,086: CRITICAL/MainProcess] Unrecoverable error: TypeError(u"Invalid argument(s) 'ssl_key','ssl_cert','ssl_active','visibility_timeout','ssl_cacert' sent to create_engine(), using configuration MySQLDialect_mysqldb/QueuePool/Engine. Please check that the keyword arguments are appropriate for this combination of components.",) Traceback (most recent call last): File "/usr/lib/python2.7/site-packages/celery/worker/worker.py", line 205, in start self.blueprint.start(self) File "/usr/lib/python2.7/site-packages/celery/bootsteps.py", line 119, in start step.start(parent) File "/usr/lib/python2.7/site-packages/celery/bootsteps.py", line 369, in start return self.obj.start() File "/usr/lib/python2.7/site-packages/celery/worker/consumer/consumer.py", line 322, in start blueprint.start(self) File "/usr/lib/python2.7/site-packages/celery/bootsteps.py", line 119, in start step.start(parent) File "/usr/lib/python2.7/site-packages/celery/worker/consumer/tasks.py", line 41, in start c.connection, on_decode_error=c.on_decode_error, File "/usr/lib/python2.7/site-packages/celery/app/amqp.py", line 297, in TaskConsumer **kw File "/usr/lib/python2.7/site-packages/kombu/messaging.py", line 386, in __init__ self.revive(self.channel) File "/usr/lib/python2.7/site-packages/kombu/messaging.py", line 408, in revive self.declare() File "/usr/lib/python2.7/site-packages/kombu/messaging.py", line 421, in declare queue.declare() File "/usr/lib/python2.7/site-packages/kombu/entity.py", line 605, in declare self._create_queue(nowait=nowait, channel=channel) File "/usr/lib/python2.7/site-packages/kombu/entity.py", line 614, in _create_queue self.queue_declare(nowait=nowait, passive=False, channel=channel) File "/usr/lib/python2.7/site-packages/kombu/entity.py", line 649, in queue_declare nowait=nowait, File "/usr/lib/python2.7/site-packages/kombu/transport/virtual/base.py", line 531, in queue_declare self._new_queue(queue, **kwargs) File "/usr/lib/python2.7/site-packages/kombu/transport/sqlalchemy/__init__.py", line 82, in _new_queue self._get_or_create(queue) File "/usr/lib/python2.7/site-packages/kombu/transport/sqlalchemy/__init__.py", line 70, in _get_or_create obj = self.session.query(self.queue_cls) \ File "/usr/lib/python2.7/site-packages/kombu/transport/sqlalchemy/__init__.py", line 65, in session _, Session = self._open() File "/usr/lib/python2.7/site-packages/kombu/transport/sqlalchemy/__init__.py", line 56, in _open engine = self._engine_from_config() File "/usr/lib/python2.7/site-packages/kombu/transport/sqlalchemy/__init__.py", line 51, in _engine_from_config return create_engine(conninfo.hostname, **transport_options) File "/usr/lib64/python2.7/site-packages/sqlalchemy/engine/__init__.py", line 391, in create_engine return strategy.create(*args, **kwargs) File "/usr/lib64/python2.7/site-packages/sqlalchemy/engine/strategies.py", line 160, in create engineclass.__name__)) TypeError: Invalid argument(s) 'ssl_key','ssl_cert','ssl_active','visibility_timeout','ssl_cacert' sent to create_engine(), using configuration MySQLDialect_mysqldb/QueuePool/Engine. Please check that the keyword arguments are appropriate for this combination of components. -------------- celery@qa1 v4.2.0rc3 (windowlicker) ---- **** ----- --- * *** * -- Linux-4.13.0-16-generic-x86_64-with-centos-7.3.1611-Core 2018-05-21 07:46:12 -- * - **** --- - ** ---------- [config] - ** ---------- .> app: airflow.executors.celery_executor:0x4766d50 - ** ---------- .> transport: sqla+mysql://airflow:blah@localhost:3306/mydb - ** ---------- .> results: mysql://airflow:**@localhost:3306/airflow - *** --- * --- .> concurrency: 16 (prefork) -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker) --- ***** ----- -------------- [queues] .> airflow_celery exchange=airflow_celery(direct) key=airflow_celery What is the correct way to override the celery_broker_transport_options? I thought that having an empty section in airflow.cfg would be enough? I thought that this was fixed with: [https://github.com/apache/incubator-airflow/pull/2842] I cannot pass visibilty_timeout or ssl_key to a mysql backend. > Wrong options passed to SQLAlchemy celery broker backend > -------------------------------------------------------- > > Key: AIRFLOW-2519 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2519 > Project: Apache Airflow > Issue Type: Bug > Components: celery > Affects Versions: 1.9.0 > Reporter: Craig Rodrigues > Priority: Major > > Hi, > > I used this requirements.txt file to install airflow from the v1-10-test > branch: > > git+[https://github.com/celery/celery@master#egg=celery] > > git+[https://github.com/apache/incubator-airflow@v1-10-test#egg=apache-airflow[celery,crypto,emr,hive,hdfs,ldap,mysql,postgres,redis,slack,s3]] > kombu>=4.1.0 > > > In my airflow.cfg, I have: > > [celery] > executor = CeleryExecutor > > executor = CeleryExec > broker_url = sqla+mysql://airflow:blah@localhost:3306/mydb > > [celery_broker_transport_options] > # > # > > However, if I manually run this code inside the webserver, I see: > > python -c "from airflow import configuration; c = > configuration.conf.getsection('celery_broker_transport_options'); print(c)" > OrderedDict([(u'visibility_timeout', 21600), (u'ssl_active', False), > (u'ssl_key', u''), (u'ssl_cert', u''), (u'ssl_cacert', u'')]) > > My worker crashes with this error: > > > [2018-05-21 07:46:12,406] \{configuration.py:212} WARNING - section/key > [celery/ssl_active] not found in config > [2018-05-21 07:46:12,407] \{default_celery.py:51} WARNING - Celery Executor > will run without SSL > [2018-05-21 07:46:12,411] \{__init__.py:48} INFO - Using executor > CeleryExecutor > [2018-05-21 07:46:13,086: CRITICAL/MainProcess] Unrecoverable error: > TypeError(u"Invalid argument(s) > 'ssl_key','ssl_cert','ssl_active','visibility_timeout','ssl_cacert' sent to > create_engine(), using configuration MySQLDialect_mysqldb/QueuePool/Engine. > Please check that the keyword arguments are appropriate for this combination > of components.",) > Traceback (most recent call last): > File "/usr/lib/python2.7/site-packages/celery/worker/worker.py", line 205, > in start > self.blueprint.start(self) > File "/usr/lib/python2.7/site-packages/celery/bootsteps.py", line 119, in > start > step.start(parent) > File "/usr/lib/python2.7/site-packages/celery/bootsteps.py", line 369, in > start > return self.obj.start() > File > "/usr/lib/python2.7/site-packages/celery/worker/consumer/consumer.py", line > 322, in start > blueprint.start(self) > File "/usr/lib/python2.7/site-packages/celery/bootsteps.py", line 119, in > start > step.start(parent) > File "/usr/lib/python2.7/site-packages/celery/worker/consumer/tasks.py", > line 41, in start > c.connection, on_decode_error=c.on_decode_error, > File "/usr/lib/python2.7/site-packages/celery/app/amqp.py", line 297, in > TaskConsumer > **kw > File "/usr/lib/python2.7/site-packages/kombu/messaging.py", line 386, in > __init__ > self.revive(self.channel) > File "/usr/lib/python2.7/site-packages/kombu/messaging.py", line 408, in > revive > self.declare() > File "/usr/lib/python2.7/site-packages/kombu/messaging.py", line 421, in > declare > queue.declare() > File "/usr/lib/python2.7/site-packages/kombu/entity.py", line 605, in > declare > self._create_queue(nowait=nowait, channel=channel) > File "/usr/lib/python2.7/site-packages/kombu/entity.py", line 614, in > _create_queue > self.queue_declare(nowait=nowait, passive=False, channel=channel) > File "/usr/lib/python2.7/site-packages/kombu/entity.py", line 649, in > queue_declare > nowait=nowait, > File "/usr/lib/python2.7/site-packages/kombu/transport/virtual/base.py", > line 531, in queue_declare > self._new_queue(queue, **kwargs) > File > "/usr/lib/python2.7/site-packages/kombu/transport/sqlalchemy/__init__.py", > line 82, in _new_queue > self._get_or_create(queue) > File > "/usr/lib/python2.7/site-packages/kombu/transport/sqlalchemy/__init__.py", > line 70, in _get_or_create > obj = self.session.query(self.queue_cls) \ > File > "/usr/lib/python2.7/site-packages/kombu/transport/sqlalchemy/__init__.py", > line 65, in session > _, Session = self._open() > File > "/usr/lib/python2.7/site-packages/kombu/transport/sqlalchemy/__init__.py", > line 56, in _open > engine = self._engine_from_config() > File > "/usr/lib/python2.7/site-packages/kombu/transport/sqlalchemy/__init__.py", > line 51, in _engine_from_config > return create_engine(conninfo.hostname, **transport_options) > File "/usr/lib64/python2.7/site-packages/sqlalchemy/engine/__init__.py", > line 391, in create_engine > return strategy.create(*args, **kwargs) > File "/usr/lib64/python2.7/site-packages/sqlalchemy/engine/strategies.py", > line 160, in create > engineclass.__name__)) > TypeError: Invalid argument(s) > 'ssl_key','ssl_cert','ssl_active','visibility_timeout','ssl_cacert' sent to > create_engine(), using configuration MySQLDialect_mysqldb/QueuePool/Engine. > Please check that the keyword arguments are appropriate for this combination > of components. > > -------------- celery@qa1 v4.2.0rc3 (windowlicker) > ---- **** ----- > --- * *** * -- Linux-4.13.0-16-generic-x86_64-with-centos-7.3.1611-Core > 2018-05-21 07:46:12 > -- * - **** --- > - ** ---------- [config] > - ** ---------- .> app: airflow.executors.celery_executor:0x4766d50 > - ** ---------- .> transport: sqla+mysql://airflow:blah@localhost:3306/mydb > - ** ---------- .> results: mysql://airflow:**@localhost:3306/airflow > - *** --- * --- .> concurrency: 16 (prefork) > -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this > worker) > --- ***** ----- > -------------- [queues] > .> airflow_celery exchange=airflow_celery(direct) > key=airflow_celery > > > > What is the correct way to override the celery_broker_transport_options? > I thought that having an empty section in airflow.cfg would be enough? > > I thought that this was fixed with: > [https://github.com/apache/incubator-airflow/pull/2842] > > > I cannot pass visibilty_timeout or ssl_key to a mysql backend. -- This message was sent by Atlassian JIRA (v7.6.3#76005)