Bolke,

Can you help me with this?
You have worked on this code with respect to parsing celery broker options.

I cannot figure out how to override the defaults, and wrong values are
being passed
down into the mysql backend, causing things to fail.

This is blocking me from doing further testing of airflow 1.10 in my
environment.

Since I have found stability bugs in airflow 1.9.0 that have been fixed in
master, I want to try
to run airflow 1.10 from git.

Thanks.
--
Craig

On Mon, May 21, 2018 at 1:50 AM Craig Rodrigues <crodr...@gmail.com> wrote:

> Hi,
>
> I used this requirements.txt file to install airflow from the v1-10-test
> branch:
>
> git+https://github.com/celery/celery@master#egg=celery
> git+
> https://github.com/apache/incubator-airflow@v1-10-test#egg=apache-airflow[celery,crypto,emr,hive,hdfs,ldap,mysql,postgres,redis,slack,s3]
> kombu>=4.1.0
>
>
> In my airflow.cfg, I have:
>
> [celery]
> executor = CeleryExecutor
>
> executor = CeleryExec
> broker_url = sqla+mysql://airflow:blah@localhost:3306/mydb
>
> [celery_broker_transport_options]
> #
> #
>
> However, if I manually run this code inside the webserver, I see:
>
> python -c "from airflow import configuration; c =
> configuration.conf.getsection('celery_broker_transport_options'); print(c)"
> OrderedDict([(u'visibility_timeout', 21600), (u'ssl_active', False),
> (u'ssl_key', u''), (u'ssl_cert', u''), (u'ssl_cacert', u'')])
>
> My worker crashes with this error:
>
>
> [2018-05-21 07:46:12,406] {configuration.py:212} WARNING - section/key
> [celery/ssl_active] not found in config
> [2018-05-21 07:46:12,407] {default_celery.py:51} WARNING - Celery Executor
> will run without SSL
> [2018-05-21 07:46:12,411] {__init__.py:48} INFO - Using executor
> CeleryExecutor
> [2018-05-21 07:46:13,086: CRITICAL/MainProcess] Unrecoverable error:
> TypeError(u"Invalid argument(s)
> 'ssl_key','ssl_cert','ssl_active','visibility_timeout','ssl_cacert' sent to
> create_engine(), using configuration
> MySQLDialect_mysqldb/QueuePool/Engine.  Please check that the keyword
> arguments are appropriate for this combination of components.",)
> Traceback (most recent call last):
>   File "/usr/lib/python2.7/site-packages/celery/worker/worker.py", line
> 205, in start
>     self.blueprint.start(self)
>   File "/usr/lib/python2.7/site-packages/celery/bootsteps.py", line 119,
> in start
>     step.start(parent)
>   File "/usr/lib/python2.7/site-packages/celery/bootsteps.py", line 369,
> in start
>     return self.obj.start()
>   File
> "/usr/lib/python2.7/site-packages/celery/worker/consumer/consumer.py", line
> 322, in start
>     blueprint.start(self)
>   File "/usr/lib/python2.7/site-packages/celery/bootsteps.py", line 119,
> in start
>     step.start(parent)
>   File "/usr/lib/python2.7/site-packages/celery/worker/consumer/tasks.py",
> line 41, in start
>     c.connection, on_decode_error=c.on_decode_error,
>   File "/usr/lib/python2.7/site-packages/celery/app/amqp.py", line 297, in
> TaskConsumer
>     **kw
>   File "/usr/lib/python2.7/site-packages/kombu/messaging.py", line 386, in
> __init__
>     self.revive(self.channel)
>   File "/usr/lib/python2.7/site-packages/kombu/messaging.py", line 408, in
> revive
>     self.declare()
>   File "/usr/lib/python2.7/site-packages/kombu/messaging.py", line 421, in
> declare
>     queue.declare()
>   File "/usr/lib/python2.7/site-packages/kombu/entity.py", line 605, in
> declare
>     self._create_queue(nowait=nowait, channel=channel)
>   File "/usr/lib/python2.7/site-packages/kombu/entity.py", line 614, in
> _create_queue
>     self.queue_declare(nowait=nowait, passive=False, channel=channel)
>   File "/usr/lib/python2.7/site-packages/kombu/entity.py", line 649, in
> queue_declare
>     nowait=nowait,
>   File "/usr/lib/python2.7/site-packages/kombu/transport/virtual/base.py",
> line 531, in queue_declare
>     self._new_queue(queue, **kwargs)
>   File
> "/usr/lib/python2.7/site-packages/kombu/transport/sqlalchemy/__init__.py",
> line 82, in _new_queue
>     self._get_or_create(queue)
>   File
> "/usr/lib/python2.7/site-packages/kombu/transport/sqlalchemy/__init__.py",
> line 70, in _get_or_create
>     obj = self.session.query(self.queue_cls) \
>   File
> "/usr/lib/python2.7/site-packages/kombu/transport/sqlalchemy/__init__.py",
> line 65, in session
>     _, Session = self._open()
>   File
> "/usr/lib/python2.7/site-packages/kombu/transport/sqlalchemy/__init__.py",
> line 56, in _open
>     engine = self._engine_from_config()
>   File
> "/usr/lib/python2.7/site-packages/kombu/transport/sqlalchemy/__init__.py",
> line 51, in _engine_from_config
>     return create_engine(conninfo.hostname, **transport_options)
>   File "/usr/lib64/python2.7/site-packages/sqlalchemy/engine/__init__.py",
> line 391, in create_engine
>     return strategy.create(*args, **kwargs)
>   File
> "/usr/lib64/python2.7/site-packages/sqlalchemy/engine/strategies.py", line
> 160, in create
>     engineclass.__name__))
> TypeError: Invalid argument(s)
> 'ssl_key','ssl_cert','ssl_active','visibility_timeout','ssl_cacert' sent to
> create_engine(), using configuration
> MySQLDialect_mysqldb/QueuePool/Engine.  Please check that the keyword
> arguments are appropriate for this combination of components.
>
>  -------------- celery@qa1 v4.2.0rc3 (windowlicker)
> ---- **** -----
> --- * ***  * -- Linux-4.13.0-16-generic-x86_64-with-centos-7.3.1611-Core
> 2018-05-21 07:46:12
> -- * - **** ---
> - ** ---------- [config]
> - ** ---------- .> app:         airflow.executors.celery_executor:0x4766d50
> - ** ---------- .> transport:   sqla+mysql://airflow:blah@localhost
> :3306/mydb
> - ** ---------- .> results:     mysql://airflow:**@localhost:3306/airflow
> - *** --- * --- .> concurrency: 16 (prefork)
> -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this
> worker)
> --- ***** -----
>  -------------- [queues]
>                 .> airflow_celery   exchange=airflow_celery(direct)
> key=airflow_celery
>
>
>
> What is the correct way to override the celery_broker_transport_options?
> I thought that having an empty section in airflow.cfg would be enough?
>
> I thought that this was fixed with:
> https://github.com/apache/incubator-airflow/pull/2842
>
>
> I cannot pass visibilty_timeout or ssl_key to a mysql backend.
> --
> Craig
>
>
>
>
>
>
>
>

-- 
Craig Rodrigues
rodr...@rodrigues.org

Reply via email to