Hi,

I used this requirements.txt file to install airflow from the v1-10-test branch:

git+https://github.com/celery/celery@master#egg=celery
git+https://github.com/apache/incubator-airflow@v1-10-test#egg=apache-airflow[celery,crypto,emr,hive,hdfs,ldap,mysql,postgres,redis,slack,s3]
kombu>=4.1.0


In my airflow.cfg, I have:

[celery]
executor = CeleryExecutor

executor = CeleryExec
broker_url = sqla+mysql://airflow:blah@localhost:3306/mydb

[celery_broker_transport_options]
#
#

However, if I manually run this code inside the webserver, I see:

python -c "from airflow import configuration; c = 
configuration.conf.getsection('celery_broker_transport_options'); print(c)"
OrderedDict([(u'visibility_timeout', 21600), (u'ssl_active', False), 
(u'ssl_key', u''), (u'ssl_cert', u''), (u'ssl_cacert', u'')])

My worker crashes with this error:


[2018-05-21 07:46:12,406] {configuration.py:212} WARNING - section/key 
[celery/ssl_active] not found in config
[2018-05-21 07:46:12,407] {default_celery.py:51} WARNING - Celery Executor will 
run without SSL
[2018-05-21 07:46:12,411] {__init__.py:48} INFO - Using executor CeleryExecutor
[2018-05-21 07:46:13,086: CRITICAL/MainProcess] Unrecoverable error: 
TypeError(u"Invalid argument(s) 
'ssl_key','ssl_cert','ssl_active','visibility_timeout','ssl_cacert' sent to 
create_engine(), using configuration MySQLDialect_mysqldb/QueuePool/Engine.  
Please check that the keyword arguments are appropriate for this combination of 
components.",)
Traceback (most recent call last):
  File "/usr/lib/python2.7/site-packages/celery/worker/worker.py", line 205, in 
start
    self.blueprint.start(self)
  File "/usr/lib/python2.7/site-packages/celery/bootsteps.py", line 119, in 
start
    step.start(parent)
  File "/usr/lib/python2.7/site-packages/celery/bootsteps.py", line 369, in 
start
    return self.obj.start()
  File "/usr/lib/python2.7/site-packages/celery/worker/consumer/consumer.py", 
line 322, in start
    blueprint.start(self)
  File "/usr/lib/python2.7/site-packages/celery/bootsteps.py", line 119, in 
start
    step.start(parent)
  File "/usr/lib/python2.7/site-packages/celery/worker/consumer/tasks.py", line 
41, in start
    c.connection, on_decode_error=c.on_decode_error,
  File "/usr/lib/python2.7/site-packages/celery/app/amqp.py", line 297, in 
TaskConsumer
    **kw
  File "/usr/lib/python2.7/site-packages/kombu/messaging.py", line 386, in 
__init__
    self.revive(self.channel)
  File "/usr/lib/python2.7/site-packages/kombu/messaging.py", line 408, in 
revive
    self.declare()
  File "/usr/lib/python2.7/site-packages/kombu/messaging.py", line 421, in 
declare
    queue.declare()
  File "/usr/lib/python2.7/site-packages/kombu/entity.py", line 605, in declare
    self._create_queue(nowait=nowait, channel=channel)
  File "/usr/lib/python2.7/site-packages/kombu/entity.py", line 614, in 
_create_queue
    self.queue_declare(nowait=nowait, passive=False, channel=channel)
  File "/usr/lib/python2.7/site-packages/kombu/entity.py", line 649, in 
queue_declare
    nowait=nowait,
  File "/usr/lib/python2.7/site-packages/kombu/transport/virtual/base.py", line 
531, in queue_declare
    self._new_queue(queue, **kwargs)
  File 
"/usr/lib/python2.7/site-packages/kombu/transport/sqlalchemy/__init__.py", line 
82, in _new_queue
    self._get_or_create(queue)
  File 
"/usr/lib/python2.7/site-packages/kombu/transport/sqlalchemy/__init__.py", line 
70, in _get_or_create
    obj = self.session.query(self.queue_cls) \
  File 
"/usr/lib/python2.7/site-packages/kombu/transport/sqlalchemy/__init__.py", line 
65, in session
    _, Session = self._open()
  File 
"/usr/lib/python2.7/site-packages/kombu/transport/sqlalchemy/__init__.py", line 
56, in _open
    engine = self._engine_from_config()
  File 
"/usr/lib/python2.7/site-packages/kombu/transport/sqlalchemy/__init__.py", line 
51, in _engine_from_config
    return create_engine(conninfo.hostname, **transport_options)
  File "/usr/lib64/python2.7/site-packages/sqlalchemy/engine/__init__.py", line 
391, in create_engine
    return strategy.create(*args, **kwargs)
  File "/usr/lib64/python2.7/site-packages/sqlalchemy/engine/strategies.py", 
line 160, in create
    engineclass.__name__))
TypeError: Invalid argument(s) 
'ssl_key','ssl_cert','ssl_active','visibility_timeout','ssl_cacert' sent to 
create_engine(), using configuration MySQLDialect_mysqldb/QueuePool/Engine.  
Please check that the keyword arguments are appropriate for this combination of 
components.
 
 -------------- celery@qa1 v4.2.0rc3 (windowlicker)
---- **** ----- 
--- * ***  * -- Linux-4.13.0-16-generic-x86_64-with-centos-7.3.1611-Core 
2018-05-21 07:46:12
-- * - **** --- 
- ** ---------- [config]
- ** ---------- .> app:         airflow.executors.celery_executor:0x4766d50
- ** ---------- .> transport:   sqla+mysql://airflow:blah@localhost:3306/mydb
- ** ---------- .> results:     mysql://airflow:**@localhost:3306/airflow
- *** --- * --- .> concurrency: 16 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ----- 
 -------------- [queues]
                .> airflow_celery   exchange=airflow_celery(direct) 
key=airflow_celery



What is the correct way to override the celery_broker_transport_options?
I thought that having an empty section in airflow.cfg would be enough?

I thought that this was fixed with: 
https://github.com/apache/incubator-airflow/pull/2842


I cannot pass visibilty_timeout or ssl_key to a mysql backend.
--
Craig
                






Reply via email to