Re: Where to put docs on configuring specific kinds of connections? (Or restructuring the docs the way Django does)

2018-05-21 Thread Taylor Edmiston
Hey Tim -

I came to Airflow from the Django world as well and had the same thought that 
much of the work that's been put into their docs over time could be applied 
here too.  In terms of documentation for large Python projects, perhaps they're 
the gold standard.

Can you give a few examples of the specific docs you think would benefit from 
refactoring out here?  I'd be happy to assist with this effort as well.

Taylor
Sent from my iPhone

> On May 21, 2018, at 4:35 PM, Tim Swast  wrote:
> 
> Hey folks,
> 
> I'd like to write some docs on how to create a GCP connection (and leave
> room for documenting other kinds of connections as well). Currently it
> seems like there are a couple places such a thing could fit:
> 
>   - https://airflow.incubator.apache.org/configuration.html#connections
>   -
>   
> https://airflow.incubator.apache.org/integration.html#gcp-google-cloud-platform
> 
> There's also the concepts guide, but I definitely don't think that's the
> right place for documenting a specific task like this.
> 
> There's a principle I'm used to following with the GCP docs, that the distinct
> kinds of documentation
> 
> should be organized separately. The Django project does this
> https://docs.djangoproject.com/en/2.0/ by splitting into
> 
>   - Tutorials
>   - Topic guides (what Airflow calls Concepts)
>   - Reference guides
>   - How-to guides
> 
> I'd like to propose we split some of the existing "Configuration" topics
> into separate how-to guides. What do you think?
> 
> Meta: Should I create JIRA issues for this kind of pre-discussion or start
> here as I've done?
> 
> *  •  **Tim Swast*
> *  •  *Software Friendliness Engineer
> *  •  *Google Cloud Developer Relations
> *  •  *Seattle, WA, USA


Where to put docs on configuring specific kinds of connections? (Or restructuring the docs the way Django does)

2018-05-21 Thread Tim Swast
Hey folks,

I'd like to write some docs on how to create a GCP connection (and leave
room for documenting other kinds of connections as well). Currently it
seems like there are a couple places such a thing could fit:

   - https://airflow.incubator.apache.org/configuration.html#connections
   -
   
https://airflow.incubator.apache.org/integration.html#gcp-google-cloud-platform

There's also the concepts guide, but I definitely don't think that's the
right place for documenting a specific task like this.

There's a principle I'm used to following with the GCP docs, that the distinct
kinds of documentation

should be organized separately. The Django project does this
https://docs.djangoproject.com/en/2.0/ by splitting into

   - Tutorials
   - Topic guides (what Airflow calls Concepts)
   - Reference guides
   - How-to guides

I'd like to propose we split some of the existing "Configuration" topics
into separate how-to guides. What do you think?

Meta: Should I create JIRA issues for this kind of pre-discussion or start
here as I've done?

*  •  **Tim Swast*
*  •  *Software Friendliness Engineer
*  •  *Google Cloud Developer Relations
*  •  *Seattle, WA, USA


Re: celery problem: cannot override celery_broker_transport_options

2018-05-21 Thread Craig Rodrigues
Bolke,

Can you help me with this?
You have worked on this code with respect to parsing celery broker options.

I cannot figure out how to override the defaults, and wrong values are
being passed
down into the mysql backend, causing things to fail.

This is blocking me from doing further testing of airflow 1.10 in my
environment.

Since I have found stability bugs in airflow 1.9.0 that have been fixed in
master, I want to try
to run airflow 1.10 from git.

Thanks.
--
Craig

On Mon, May 21, 2018 at 1:50 AM Craig Rodrigues  wrote:

> Hi,
>
> I used this requirements.txt file to install airflow from the v1-10-test
> branch:
>
> git+https://github.com/celery/celery@master#egg=celery
> git+
> https://github.com/apache/incubator-airflow@v1-10-test#egg=apache-airflow[celery,crypto,emr,hive,hdfs,ldap,mysql,postgres,redis,slack,s3]
> kombu>=4.1.0
>
>
> In my airflow.cfg, I have:
>
> [celery]
> executor = CeleryExecutor
>
> executor = CeleryExec
> broker_url = sqla+mysql://airflow:blah@localhost:3306/mydb
>
> [celery_broker_transport_options]
> #
> #
>
> However, if I manually run this code inside the webserver, I see:
>
> python -c "from airflow import configuration; c =
> configuration.conf.getsection('celery_broker_transport_options'); print(c)"
> OrderedDict([(u'visibility_timeout', 21600), (u'ssl_active', False),
> (u'ssl_key', u''), (u'ssl_cert', u''), (u'ssl_cacert', u'')])
>
> My worker crashes with this error:
>
>
> [2018-05-21 07:46:12,406] {configuration.py:212} WARNING - section/key
> [celery/ssl_active] not found in config
> [2018-05-21 07:46:12,407] {default_celery.py:51} WARNING - Celery Executor
> will run without SSL
> [2018-05-21 07:46:12,411] {__init__.py:48} INFO - Using executor
> CeleryExecutor
> [2018-05-21 07:46:13,086: CRITICAL/MainProcess] Unrecoverable error:
> TypeError(u"Invalid argument(s)
> 'ssl_key','ssl_cert','ssl_active','visibility_timeout','ssl_cacert' sent to
> create_engine(), using configuration
> MySQLDialect_mysqldb/QueuePool/Engine.  Please check that the keyword
> arguments are appropriate for this combination of components.",)
> Traceback (most recent call last):
>   File "/usr/lib/python2.7/site-packages/celery/worker/worker.py", line
> 205, in start
> self.blueprint.start(self)
>   File "/usr/lib/python2.7/site-packages/celery/bootsteps.py", line 119,
> in start
> step.start(parent)
>   File "/usr/lib/python2.7/site-packages/celery/bootsteps.py", line 369,
> in start
> return self.obj.start()
>   File
> "/usr/lib/python2.7/site-packages/celery/worker/consumer/consumer.py", line
> 322, in start
> blueprint.start(self)
>   File "/usr/lib/python2.7/site-packages/celery/bootsteps.py", line 119,
> in start
> step.start(parent)
>   File "/usr/lib/python2.7/site-packages/celery/worker/consumer/tasks.py",
> line 41, in start
> c.connection, on_decode_error=c.on_decode_error,
>   File "/usr/lib/python2.7/site-packages/celery/app/amqp.py", line 297, in
> TaskConsumer
> **kw
>   File "/usr/lib/python2.7/site-packages/kombu/messaging.py", line 386, in
> __init__
> self.revive(self.channel)
>   File "/usr/lib/python2.7/site-packages/kombu/messaging.py", line 408, in
> revive
> self.declare()
>   File "/usr/lib/python2.7/site-packages/kombu/messaging.py", line 421, in
> declare
> queue.declare()
>   File "/usr/lib/python2.7/site-packages/kombu/entity.py", line 605, in
> declare
> self._create_queue(nowait=nowait, channel=channel)
>   File "/usr/lib/python2.7/site-packages/kombu/entity.py", line 614, in
> _create_queue
> self.queue_declare(nowait=nowait, passive=False, channel=channel)
>   File "/usr/lib/python2.7/site-packages/kombu/entity.py", line 649, in
> queue_declare
> nowait=nowait,
>   File "/usr/lib/python2.7/site-packages/kombu/transport/virtual/base.py",
> line 531, in queue_declare
> self._new_queue(queue, **kwargs)
>   File
> "/usr/lib/python2.7/site-packages/kombu/transport/sqlalchemy/__init__.py",
> line 82, in _new_queue
> self._get_or_create(queue)
>   File
> "/usr/lib/python2.7/site-packages/kombu/transport/sqlalchemy/__init__.py",
> line 70, in _get_or_create
> obj = self.session.query(self.queue_cls) \
>   File
> "/usr/lib/python2.7/site-packages/kombu/transport/sqlalchemy/__init__.py",
> line 65, in session
> _, Session = self._open()
>   File
> "/usr/lib/python2.7/site-packages/kombu/transport/sqlalchemy/__init__.py",
> line 56, in _open
> engine = self._engine_from_config()
>   File
> "/usr/lib/python2.7/site-packages/kombu/transport/sqlalchemy/__init__.py",
> line 51, in _engine_from_config
> return create_engine(conninfo.hostname, **transport_options)
>   File "/usr/lib64/python2.7/site-packages/sqlalchemy/engine/__init__.py",
> line 391, in create_engine
> return strategy.create(*args, **kwargs)
>   File
> "/usr/lib64/python2.7/site-packages/sqlalchemy/engine/strategies.py", line
> 160, in create
> engineclass.__name__))
> TypeError: Invalid argument(s)
> 

Re: S3keysonsor

2018-05-21 Thread Joe Napolitano
Great, I think we're in agreement on your definition of static.

In my own experience, working with S3 keys can be painful if you can't
anticipate the key name. I don't think the S3KeySensor will work as it's
written.

There's another operator that's not in the docs, but can be seen below the
S3KeySensor called S3PrefixSensor here:
https://airflow.apache.org/_modules/sensors.html#S3KeySensor

That may work for you. Overall your question was whether or Airflow suits
your needs. I think the answer to that is YES, but in the worst case you'll
have to write a customer operator to handle your needs precisely, .e.g. by
processing all files that match a prefix "s3a://mybucket/{{date}}*".

On Mon, May 21, 2018 at 2:59 PM, purna pradeep 
wrote:

> + Joe
>
>
>
> On Mon, May 21, 2018 at 2:56 PM purna pradeep 
> wrote:
>
>> I do know only to some extent , I mean If you see my sample s3 locations
>>
>> s3a://mybucket/20180425_111447_data1/_SUCCESS
>>
>> s3a://mybucket/20180424_111241_data1/_SUCCESS
>>
>>
>>
>> The only values which are static in above location are
>>
>> s3a://mybucket/
>>
>> data1/_SUCCESS
>>
>> Now I want to configure tolerance for _SUCCESS file as latest or 1 day
>> older based on this configuration it should pick the right time stamp
>> folder which has _SUCCESS file
>>
>> On Mon, May 21, 2018 at 2:35 PM Joe Napolitano 
>> wrote:
>>
>>> Purna, with regards to "this path is not completely static," can you
>>> clarify what you mean?
>>>
>>> Do you mean that you don't know the actual key name beforehand? E.g.
>>> pertaining to "111447", "111241", and "111035" in your example?
>>>
>>> On Mon, May 21, 2018 at 2:23 PM, Brian Greene <
>>> br...@heisenbergwoodworking.com> wrote:
>>>
>>> > I suggest it’ll work for your needs.
>>> >
>>> > Sent from a device with less than stellar autocorrect
>>> >
>>> > > On May 21, 2018, at 10:16 AM, purna pradeep >> >
>>> > wrote:
>>> > >
>>> > > Hi ,
>>> > >
>>> > > I’m trying to evaluate airflow to see if it suits my needs.
>>> > >
>>> > > Basically i can have below steps in a DAG
>>> > >
>>> > >
>>> > >
>>> > > 1)Look for a file arrival on given s3 location (this path is not
>>> > completely
>>> > > static) (i can use S3Keysensor in this step)
>>> > >
>>> > >  i should be able to specify to look either for latest folder or
>>> 24hrs or
>>> > > n number of days older folder which has _SUCCESS file as mentioned
>>> below
>>> > >
>>> > >  sample file location(s):
>>> > >
>>> > >  s3a://mybucket/20180425_111447_data1/_SUCCESS
>>> > >
>>> > >
>>
>>
>> s3a://mybucket/20180424_111241_data1/_SUCCESS
>>> > >
>>> > >  s3a://mybucket/20180424_111035_data1/_SUCCESS
>>> > >
>>> > >
>>> > >
>>> > > 2)invoke a simple restapi using HttpSimpleOperator once the above
>>> > > dependency is met ,i can set upstream for step2 as step1
>>> > >
>>> > >
>>> > >
>>> > > Does S3keysensor supports step1 out of the box?
>>> > >
>>> > > Also in some cases i may to have a DAG without start date & end date
>>> it
>>> > > just needs to be triggered once file is available in a given s3
>>> location
>>> > >
>>> > >
>>> > >
>>> > > *Please suggest !*
>>> >
>>>
>>


Re: S3keysonsor

2018-05-21 Thread Rajesh C
The sensor allows wild card (*) and there is also an S3PrefixSensor which
might help in some cases. In one of my dags, I have a similar structure.

wait_on_s3_source_data = S3KeySensor(
task_id='wait_on_s3_source_data',

bucket_key="s3://mybucket_name/data_file_name_{{macros.ds_format(macros.ds_add(ds,1),'%Y-%m-%d','%Y%m%d')}}*/_SUCCESS",
wildcard_match=True,
timeout=60*15,
retries=4,
dag=dag)

I have files like the below in my bucket. The date part is fixed/expected
to be exactly a match based on ds, the time part (after 8 digits) is
different

data_file_name_20170912181034/done_marker.csv
data_file_name_20170913181035/done_marker.csv
data_file_name_20170914181027/done_marker.csv
data_file_name_20170915181033/done_marker.csv

If you can receieve a file with *any date* and you need to process it, you
might need to just use the wild card on the whole bucket and then process
the file based on the name or the data inside. May be a custom python
function to archive the file after you process it as well.

On Mon, May 21, 2018 at 2:59 PM purna pradeep 
wrote:

> + Joe
>
>
>
> On Mon, May 21, 2018 at 2:56 PM purna pradeep 
> wrote:
>
> > I do know only to some extent , I mean If you see my sample s3 locations
> >
> > s3a://mybucket/20180425_111447_data1/_SUCCESS
> >
> > s3a://mybucket/20180424_111241_data1/_SUCCESS
> >
> >
> >
> > The only values which are static in above location are
> >
> > s3a://mybucket/
> >
> > data1/_SUCCESS
> >
> > Now I want to configure tolerance for _SUCCESS file as latest or 1 day
> > older based on this configuration it should pick the right time stamp
> > folder which has _SUCCESS file
> >
> > On Mon, May 21, 2018 at 2:35 PM Joe Napolitano <
> joe.napolit...@wework.com>
> > wrote:
> >
> >> Purna, with regards to "this path is not completely static," can you
> >> clarify what you mean?
> >>
> >> Do you mean that you don't know the actual key name beforehand? E.g.
> >> pertaining to "111447", "111241", and "111035" in your example?
> >>
> >> On Mon, May 21, 2018 at 2:23 PM, Brian Greene <
> >> br...@heisenbergwoodworking.com> wrote:
> >>
> >> > I suggest it’ll work for your needs.
> >> >
> >> > Sent from a device with less than stellar autocorrect
> >> >
> >> > > On May 21, 2018, at 10:16 AM, purna pradeep <
> purna2prad...@gmail.com>
> >> > wrote:
> >> > >
> >> > > Hi ,
> >> > >
> >> > > I’m trying to evaluate airflow to see if it suits my needs.
> >> > >
> >> > > Basically i can have below steps in a DAG
> >> > >
> >> > >
> >> > >
> >> > > 1)Look for a file arrival on given s3 location (this path is not
> >> > completely
> >> > > static) (i can use S3Keysensor in this step)
> >> > >
> >> > >  i should be able to specify to look either for latest folder or
> >> 24hrs or
> >> > > n number of days older folder which has _SUCCESS file as mentioned
> >> below
> >> > >
> >> > >  sample file location(s):
> >> > >
> >> > >  s3a://mybucket/20180425_111447_data1/_SUCCESS
> >> > >
> >> > >
> >
> >
> > s3a://mybucket/20180424_111241_data1/_SUCCESS
> >> > >
> >> > >  s3a://mybucket/20180424_111035_data1/_SUCCESS
> >> > >
> >> > >
> >> > >
> >> > > 2)invoke a simple restapi using HttpSimpleOperator once the above
> >> > > dependency is met ,i can set upstream for step2 as step1
> >> > >
> >> > >
> >> > >
> >> > > Does S3keysensor supports step1 out of the box?
> >> > >
> >> > > Also in some cases i may to have a DAG without start date & end date
> >> it
> >> > > just needs to be triggered once file is available in a given s3
> >> location
> >> > >
> >> > >
> >> > >
> >> > > *Please suggest !*
> >> >
> >>
> >
>


Re: S3keysonsor

2018-05-21 Thread purna pradeep
+ Joe



On Mon, May 21, 2018 at 2:56 PM purna pradeep 
wrote:

> I do know only to some extent , I mean If you see my sample s3 locations
>
> s3a://mybucket/20180425_111447_data1/_SUCCESS
>
> s3a://mybucket/20180424_111241_data1/_SUCCESS
>
>
>
> The only values which are static in above location are
>
> s3a://mybucket/
>
> data1/_SUCCESS
>
> Now I want to configure tolerance for _SUCCESS file as latest or 1 day
> older based on this configuration it should pick the right time stamp
> folder which has _SUCCESS file
>
> On Mon, May 21, 2018 at 2:35 PM Joe Napolitano 
> wrote:
>
>> Purna, with regards to "this path is not completely static," can you
>> clarify what you mean?
>>
>> Do you mean that you don't know the actual key name beforehand? E.g.
>> pertaining to "111447", "111241", and "111035" in your example?
>>
>> On Mon, May 21, 2018 at 2:23 PM, Brian Greene <
>> br...@heisenbergwoodworking.com> wrote:
>>
>> > I suggest it’ll work for your needs.
>> >
>> > Sent from a device with less than stellar autocorrect
>> >
>> > > On May 21, 2018, at 10:16 AM, purna pradeep 
>> > wrote:
>> > >
>> > > Hi ,
>> > >
>> > > I’m trying to evaluate airflow to see if it suits my needs.
>> > >
>> > > Basically i can have below steps in a DAG
>> > >
>> > >
>> > >
>> > > 1)Look for a file arrival on given s3 location (this path is not
>> > completely
>> > > static) (i can use S3Keysensor in this step)
>> > >
>> > >  i should be able to specify to look either for latest folder or
>> 24hrs or
>> > > n number of days older folder which has _SUCCESS file as mentioned
>> below
>> > >
>> > >  sample file location(s):
>> > >
>> > >  s3a://mybucket/20180425_111447_data1/_SUCCESS
>> > >
>> > >
>
>
> s3a://mybucket/20180424_111241_data1/_SUCCESS
>> > >
>> > >  s3a://mybucket/20180424_111035_data1/_SUCCESS
>> > >
>> > >
>> > >
>> > > 2)invoke a simple restapi using HttpSimpleOperator once the above
>> > > dependency is met ,i can set upstream for step2 as step1
>> > >
>> > >
>> > >
>> > > Does S3keysensor supports step1 out of the box?
>> > >
>> > > Also in some cases i may to have a DAG without start date & end date
>> it
>> > > just needs to be triggered once file is available in a given s3
>> location
>> > >
>> > >
>> > >
>> > > *Please suggest !*
>> >
>>
>


Re: S3keysonsor

2018-05-21 Thread purna pradeep
I do know only to some extent , I mean If you see my sample s3 locations

s3a://mybucket/20180425_111447_data1/_SUCCESS

s3a://mybucket/20180424_111241_data1/_SUCCESS



The only values which are static in above location are

s3a://mybucket/

data1/_SUCCESS

Now I want to configure tolerance for _SUCCESS file as latest or 1 day
older based on this configuration it should pick the right time stamp
folder which has _SUCCESS file

On Mon, May 21, 2018 at 2:35 PM Joe Napolitano 
wrote:

> Purna, with regards to "this path is not completely static," can you
> clarify what you mean?
>
> Do you mean that you don't know the actual key name beforehand? E.g.
> pertaining to "111447", "111241", and "111035" in your example?
>
> On Mon, May 21, 2018 at 2:23 PM, Brian Greene <
> br...@heisenbergwoodworking.com> wrote:
>
> > I suggest it’ll work for your needs.
> >
> > Sent from a device with less than stellar autocorrect
> >
> > > On May 21, 2018, at 10:16 AM, purna pradeep 
> > wrote:
> > >
> > > Hi ,
> > >
> > > I’m trying to evaluate airflow to see if it suits my needs.
> > >
> > > Basically i can have below steps in a DAG
> > >
> > >
> > >
> > > 1)Look for a file arrival on given s3 location (this path is not
> > completely
> > > static) (i can use S3Keysensor in this step)
> > >
> > >  i should be able to specify to look either for latest folder or 24hrs
> or
> > > n number of days older folder which has _SUCCESS file as mentioned
> below
> > >
> > >  sample file location(s):
> > >
> > >  s3a://mybucket/20180425_111447_data1/_SUCCESS
> > >
> > >


s3a://mybucket/20180424_111241_data1/_SUCCESS
> > >
> > >  s3a://mybucket/20180424_111035_data1/_SUCCESS
> > >
> > >
> > >
> > > 2)invoke a simple restapi using HttpSimpleOperator once the above
> > > dependency is met ,i can set upstream for step2 as step1
> > >
> > >
> > >
> > > Does S3keysensor supports step1 out of the box?
> > >
> > > Also in some cases i may to have a DAG without start date & end date it
> > > just needs to be triggered once file is available in a given s3
> location
> > >
> > >
> > >
> > > *Please suggest !*
> >
>


Re: Dockerised CI and testing environment

2018-05-21 Thread Daniel Imberman
Hi Gerardo,

I left some comments on the PR. Could you please get the travis tests to
pass + rebase your PR? Afterwards I'd be glad to try it out.

On Mon, May 21, 2018 at 6:55 AM Gerardo Curiel  wrote:

> Hello folks,
>
> I just submitted a PR for using Docker as part of Airflow's build pipeline:
> https://github.com/apache/incubator-airflow/pull/3393
>
> Currently, running unit tests is a difficult process. Airflow tests depend
> on many external services and other custom setup, which makes it hard for
> contributors to work on this codebase. CI builds have also been unreliable,
> and it is hard to reproduce the causes. Having contributors trying to
> emulate the build environment every time makes it easier to get to an "it
> works on my machine" sort of situation. The proposed docker setup aims to
> simplify this.
>
> You can check the PR description, which goes into more details. Now, I
> bring this to the list because I have a few requests:
>
> - Could you try the branch out on your local machines? The instructions are
> provided in the PR (pending: actual docs). I would love to get feedback
> about it.
> - Is anyone familiar with the impersonation tests? There have proven to be
> hard to fix with my limited knowledge of the codebase
>
> It's WIP, but I wanted to submit what I've got so far and check if you guys
> think I'm going in the right direction.
>
> Cheers,
>
> --
> Gerardo Curiel // https://gerar.do
>


Re: S3keysonsor

2018-05-21 Thread Joe Napolitano
Purna, with regards to "this path is not completely static," can you
clarify what you mean?

Do you mean that you don't know the actual key name beforehand? E.g.
pertaining to "111447", "111241", and "111035" in your example?

On Mon, May 21, 2018 at 2:23 PM, Brian Greene <
br...@heisenbergwoodworking.com> wrote:

> I suggest it’ll work for your needs.
>
> Sent from a device with less than stellar autocorrect
>
> > On May 21, 2018, at 10:16 AM, purna pradeep 
> wrote:
> >
> > Hi ,
> >
> > I’m trying to evaluate airflow to see if it suits my needs.
> >
> > Basically i can have below steps in a DAG
> >
> >
> >
> > 1)Look for a file arrival on given s3 location (this path is not
> completely
> > static) (i can use S3Keysensor in this step)
> >
> >  i should be able to specify to look either for latest folder or 24hrs or
> > n number of days older folder which has _SUCCESS file as mentioned below
> >
> >  sample file location(s):
> >
> >  s3a://mybucket/20180425_111447_data1/_SUCCESS
> >
> >  s3a://mybucket/20180424_111241_data1/_SUCCESS
> >
> >  s3a://mybucket/20180424_111035_data1/_SUCCESS
> >
> >
> >
> > 2)invoke a simple restapi using HttpSimpleOperator once the above
> > dependency is met ,i can set upstream for step2 as step1
> >
> >
> >
> > Does S3keysensor supports step1 out of the box?
> >
> > Also in some cases i may to have a DAG without start date & end date it
> > just needs to be triggered once file is available in a given s3 location
> >
> >
> >
> > *Please suggest !*
>


kwargs usage in BaseOperator

2018-05-21 Thread Tao Feng
Hi,

I have a question regarding kwargs usage in BaseOperator(
https://github.com/apache/incubator-airflow/blob/master/airflow/models.py#L2306-L2315).
Since this pr(https://github.com/apache/incubator-airflow/pull/1285)
checked in, airflow turns on deprecation warning by default. And per the
comment, the community plans to remove kwargs usage in airflow 2.0.

I wonder whether we still have the plan to remove kwargs in airflow 2.0(do
we have the timeline for airflow 2.0?) as the warning generates too many
noises for our internal production env as well as the travis CI log.

Thanks,
-Tao


Re: S3keysonsor

2018-05-21 Thread Brian Greene
I suggest it’ll work for your needs.

Sent from a device with less than stellar autocorrect

> On May 21, 2018, at 10:16 AM, purna pradeep  wrote:
> 
> Hi ,
> 
> I’m trying to evaluate airflow to see if it suits my needs.
> 
> Basically i can have below steps in a DAG
> 
> 
> 
> 1)Look for a file arrival on given s3 location (this path is not completely
> static) (i can use S3Keysensor in this step)
> 
>  i should be able to specify to look either for latest folder or 24hrs or
> n number of days older folder which has _SUCCESS file as mentioned below
> 
>  sample file location(s):
> 
>  s3a://mybucket/20180425_111447_data1/_SUCCESS
> 
>  s3a://mybucket/20180424_111241_data1/_SUCCESS
> 
>  s3a://mybucket/20180424_111035_data1/_SUCCESS
> 
> 
> 
> 2)invoke a simple restapi using HttpSimpleOperator once the above
> dependency is met ,i can set upstream for step2 as step1
> 
> 
> 
> Does S3keysensor supports step1 out of the box?
> 
> Also in some cases i may to have a DAG without start date & end date it
> just needs to be triggered once file is available in a given s3 location
> 
> 
> 
> *Please suggest !*


Re: 答复: Airflow REST API proof of concept.

2018-05-21 Thread Maxime Beauchemin
Personally I think we should keep the architecture as simple as possible
and use the same web server for REST and UI. As mentioned FAB (Flask App
Builder) manages authentication and RBAC, so we can have consistent access
rights in the UI and CLI.

Max

On Fri, May 11, 2018 at 5:42 AM Luke Diment 
wrote:

> Ok thanks that’s awesome we will have a look and check it out thanks...
>
> 
>
> Sent from my iPhone
>
> > On 12/05/2018, at 12:38 AM, Driesprong, Fokko 
> wrote:
> >
> > Hi Luke,
> >
> > This is the REST api for the new UI:
> >
> https://github.com/apache/incubator-airflow/blob/master/airflow/www_rbac/api/experimental/endpoints.py
> >
> > RBAC = Role Based Access Control, the fine grained security model based
> on
> > the fabmanager. Recently we've added some endpoints to it. In the end
> also
> > all the GUI ajax calls should go by this API, instead of calling the
> flask
> > endpoints directly.
> >
> > Cheers, Fokko
> >
> >
> >
> >
> >
> >
> > 2018-05-11 14:34 GMT+02:00 Luke Diment :
> >
> >> Our build pipeline uses Jenkinsfile with Docker kubernetes and helm...we
> >> orchestrate deployment against our rest api and use junit to assert our
> >> results...fully programmatically against airflow...!
> >>
> >> Sent from my iPhone
> >>
> >>> On 12/05/2018, at 12:31 AM, Luke Diment 
> >> wrote:
> >>>
> >>> No it executes the backend airflow command line over HTTP giving
> >> developers room to freely interact with airflow programmatically...hence
> >> you can easily integration test your business logic...
> >>>
> >>> Sent from my iPhone
> >>>
>  On 12/05/2018, at 12:27 AM, Song Liu  wrote:
> 
>  So that this Java REST API server is talking to the meta db directly ?
>  
>  发件人: Luke Diment 
>  发送时间: 2018年5月11日 12:22
>  收件人: dev@airflow.incubator.apache.org
>  主题: Fwd: Airflow REST API proof of concept.
> 
>  FYI.
> 
>  Sent from my iPhone
> 
>  Begin forwarded message:
> 
>  From: Luke Diment > luke.dim...@westpac.co.nz>>
>  Date: 11 May 2018 at 1:02:43 PM NZST
>  To: "dev-ow...@airflow.incubator.apache.org >> airflow.incubator.apache.org>"  >> >
>  Subject: Fw: Airflow REST API proof of concept.
> 
> 
>  FYI.
> 
>  
>  From: Luke Diment
>  Sent: Thursday, May 10, 2018 4:33 PM
>  To: dev-subscr...@airflow.incubator.apache.org >> v-subscr...@airflow.incubator.apache.org>
>  Subject: Airflow REST API proof of concept.
> 
> 
>  Hi Airflow contributors,
> 
> 
>  I am a Java developer/full stack and lots of other stuff at Westpac
> >> Bank New Zealand.
> 
> 
>  We currently use Airflow for task scheduling for a rather large
> >> integration project for financial risk assessment.
> 
> 
>  During our development phase we started to understand that a REST API
> >> in front of Airflow would be a great idea.
> 
> 
>  We realise that you guys have detailed there will a REST API at some
> >> stage.
> 
> 
>  We have already built a proof of concept REST API implementation in
> >> Java (of course...;-))...
> 
> 
>  We were wondering if your contributor group would find this helpful or
> >> if there would be any reason to continue such an API in Java.
> 
> 
>  We look forward to your response.  We can share the code if needed...
> 
> 
>  Thanks,
> 
> 
>  Luke Diment.
> 
> 
> 
> 
> 
>  The contents of this email and any attachments are confidential and
> may
> >> be legally privileged. If you are not the intended recipient please
> advise
> >> the sender immediately and delete the email and attachments. Any use,
> >> dissemination, reproduction or distribution of this email and any
> >> attachments by anyone other than the intended recipient is prohibited.
> >>>
> >>>
> >>>
> >>> The contents of this email and any attachments are confidential and may
> >> be legally privileged. If you are not the intended recipient please
> advise
> >> the sender immediately and delete the email and attachments. Any use,
> >> dissemination, reproduction or distribution of this email and any
> >> attachments by anyone other than the intended recipient is prohibited.
> >>
> >>
> >>
> >> The contents of this email and any attachments are confidential and may
> be
> >> legally privileged. If you are not the intended recipient please advise
> the
> >> sender immediately and delete the email and attachments. Any use,
> >> dissemination, reproduction or distribution of this email and any
> >> attachments by anyone other than the intended 

Re: celery problem, AttributeError: async

2018-05-21 Thread Bolke de Bruin
We will rebranch 1.10 from master. Sorry, I have been too busy with normal life 
to be able to follow up on the release of 1.10.

B.

> On 21 May 2018, at 19:54, Craig Rodrigues  wrote:
> 
> Kaxil,
> 
> Thanks for merging this into master.
> What is the procedure to get this into v1-10-test branch?
> I am heavily testing that branch right now, and want to deploy that branch to 
> my prod system
> this Friday.
> 
> --
> Craig
> 
> On 2018/05/21 14:52:33, Craig Rodrigues  wrote: 
>> I have submitted:
>> 
>> https://github.com/apache/incubator-airflow/pull/3388
>> 
>> --
>> Craig
> 



Re: Dags getting failed after 24 hours

2018-05-21 Thread Maxime Beauchemin
Even though it's possible to set and `execution_timeout` on any task and/or
a dagrun_timeout on DAG runs, by default it's all set to None (unless
you're somehow setting the DAG's default parameters in some other ways).

Maybe your have some OS-level policies on long-running processes in your
environment? Anything in the logs? SIGKILL?

Max

On Mon, May 21, 2018 at 5:13 AM ramandu...@gmail.com 
wrote:

> Hi All,
> We have a long running DAG which is expected to take around 48 hours. But
> we are observing that its get killed by Airflow scheduler after ~24 hrs. We
> are not setting any Dag/task execution timeout explicitly.
> Is there any default timeout value that get used. We are using
> LocalExecutor mode.
> We checked in the Airflow code but execution timeout values seem to be set
> to 'None'
>
> Thanks,
> Raman Gupta
>


Re: celery problem, AttributeError: async

2018-05-21 Thread Craig Rodrigues
I have submitted:

https://github.com/apache/incubator-airflow/pull/3388

--
Craig

On Mon, May 21, 2018 at 7:00 AM Naik Kaxil  wrote:

> Thanks. Please do that.
>
> On 21/05/2018, 14:59, "Craig Rodrigues"  wrote:
>
> celery 4.1.1 was just released last night which has all the async
> problems fixed:
>
> https://github.com/celery/celery/commits/v4.1.1
>
> I'll test this out, and then submit a PR to bump airflow's celery
> version to 4.1.1
>
> --
> Craig
>
> On 2018/05/21 07:20:50, Craig Rodrigues  wrote:
> > Hi,
> >
> > I used a requiremens.txt file with these three lines:
> >
> > git+
> https://github.com/apache/incubator-airflow@v1-10-test#egg=apache-airflow[celery,crypto,emr,hive,hdfs,ldap,mysql,postgres,redis,slack,s3]
> > celery>=4.2.0rc3
> > kombu>=4.2.0
> >
> >
> > I did
> >
> > pip install -r requirements.txt
> >
> > When I started my worker, I got:
> >
> > [2018-05-21 06:39:23,980] {__init__.py:48} INFO - Using executor
> CeleryExecutor
> > Traceback (most recent call last):
> >   File "/bin/airflow", line 32, in 
> > args.func(args)
> >   File "/usr/lib/python2.7/site-packages/airflow/utils/cli.py", line
> 74, in wrapper
> > return f(*args, **kwargs)
> >   File "/usr/lib/python2.7/site-packages/airflow/bin/cli.py", line
> 959, in worker
> > worker.run(**options)
> >   File "/usr/lib/python2.7/site-packages/celery/bin/worker.py", line
> 257, in run
> > **kwargs)
> >   File "/usr/lib/python2.7/site-packages/celery/worker/worker.py",
> line 101, in __init__
> > self.setup_instance(**self.prepare_args(**kwargs))
> >   File "/usr/lib/python2.7/site-packages/celery/worker/worker.py",
> line 124, in setup_instance
> > self.should_use_eventloop() if use_eventloop is None
> >   File "/usr/lib/python2.7/site-packages/celery/worker/worker.py",
> line 243, in should_use_eve
> > self._conninfo.transport.implements.async and
> >   File "/usr/lib/python2.7/site-packages/kombu/transport/base.py",
> line 125, in __getattr__
> > raise AttributeError(key)
> > AttributeError: async
> >
> >
> >
> > I reported this as a bug in celery here:
> >
> > https://github.com/celery/celery/issues/4747
> >
> > It turns out that this is a known problem.  It looks like in kombu,
> they
> > renamed a bunch of uses of "async" to "asynchronous", to not clash
> with the new
> > 'async' keyword in Python 3.7.
> >
> > In celery, they are catching up and doing the same rename.
> > Almost at the same time that I reported this bug, the issue was
> fixed in the master branch of celery:
> >
> >
> https://github.com/celery/celery/commit/c8ef7ad60b72a194654c58beb04a1d65cd0435ad
> >
> > Hopefully the versions of kombu and celery will stabilize so that it
> all works properly with airflow.
> >
> > --
> > Craig
> >
> >
> >
>
>
>
>
> Kaxil Naik
>
> Data Reply
> 2nd Floor, Nova South
> 160 Victoria Street, Westminster
> London SW1E 5LB - UK
> phone: +44 (0)20 7730 6000
> k.n...@reply.com
> www.reply.com
>


Re: celery problem, AttributeError: async

2018-05-21 Thread Naik Kaxil
Thanks. Please do that.

On 21/05/2018, 14:59, "Craig Rodrigues"  wrote:

celery 4.1.1 was just released last night which has all the async problems 
fixed:

https://github.com/celery/celery/commits/v4.1.1

I'll test this out, and then submit a PR to bump airflow's celery version 
to 4.1.1

--
Craig

On 2018/05/21 07:20:50, Craig Rodrigues  wrote: 
> Hi,
> 
> I used a requiremens.txt file with these three lines:
> 
> 
git+https://github.com/apache/incubator-airflow@v1-10-test#egg=apache-airflow[celery,crypto,emr,hive,hdfs,ldap,mysql,postgres,redis,slack,s3]
> celery>=4.2.0rc3
> kombu>=4.2.0
> 
> 
> I did
> 
> pip install -r requirements.txt
> 
> When I started my worker, I got:
> 
> [2018-05-21 06:39:23,980] {__init__.py:48} INFO - Using executor 
CeleryExecutor
> Traceback (most recent call last):
>   File "/bin/airflow", line 32, in 
> args.func(args)
>   File "/usr/lib/python2.7/site-packages/airflow/utils/cli.py", line 74, 
in wrapper
> return f(*args, **kwargs)
>   File "/usr/lib/python2.7/site-packages/airflow/bin/cli.py", line 959, 
in worker
> worker.run(**options)
>   File "/usr/lib/python2.7/site-packages/celery/bin/worker.py", line 257, 
in run
> **kwargs)
>   File "/usr/lib/python2.7/site-packages/celery/worker/worker.py", line 
101, in __init__
> self.setup_instance(**self.prepare_args(**kwargs))
>   File "/usr/lib/python2.7/site-packages/celery/worker/worker.py", line 
124, in setup_instance
> self.should_use_eventloop() if use_eventloop is None
>   File "/usr/lib/python2.7/site-packages/celery/worker/worker.py", line 
243, in should_use_eve
> self._conninfo.transport.implements.async and
>   File "/usr/lib/python2.7/site-packages/kombu/transport/base.py", line 
125, in __getattr__
> raise AttributeError(key)
> AttributeError: async
> 
> 
> 
> I reported this as a bug in celery here:
> 
> https://github.com/celery/celery/issues/4747
> 
> It turns out that this is a known problem.  It looks like in kombu, they
> renamed a bunch of uses of "async" to "asynchronous", to not clash with 
the new
> 'async' keyword in Python 3.7.
> 
> In celery, they are catching up and doing the same rename.  
> Almost at the same time that I reported this bug, the issue was fixed in 
the master branch of celery:
> 
> 
https://github.com/celery/celery/commit/c8ef7ad60b72a194654c58beb04a1d65cd0435ad
> 
> Hopefully the versions of kombu and celery will stabilize so that it all 
works properly with airflow.
> 
> --
> Craig
> 
> 
> 




Kaxil Naik 

Data Reply
2nd Floor, Nova South
160 Victoria Street, Westminster
London SW1E 5LB - UK 
phone: +44 (0)20 7730 6000
k.n...@reply.com
www.reply.com


Re: celery problem, AttributeError: async

2018-05-21 Thread Craig Rodrigues
celery 4.1.1 was just released last night which has all the async problems 
fixed:

https://github.com/celery/celery/commits/v4.1.1

I'll test this out, and then submit a PR to bump airflow's celery version to 
4.1.1

--
Craig

On 2018/05/21 07:20:50, Craig Rodrigues  wrote: 
> Hi,
> 
> I used a requiremens.txt file with these three lines:
> 
> git+https://github.com/apache/incubator-airflow@v1-10-test#egg=apache-airflow[celery,crypto,emr,hive,hdfs,ldap,mysql,postgres,redis,slack,s3]
> celery>=4.2.0rc3
> kombu>=4.2.0
> 
> 
> I did
> 
> pip install -r requirements.txt
> 
> When I started my worker, I got:
> 
> [2018-05-21 06:39:23,980] {__init__.py:48} INFO - Using executor 
> CeleryExecutor
> Traceback (most recent call last):
>   File "/bin/airflow", line 32, in 
> args.func(args)
>   File "/usr/lib/python2.7/site-packages/airflow/utils/cli.py", line 74, in 
> wrapper
> return f(*args, **kwargs)
>   File "/usr/lib/python2.7/site-packages/airflow/bin/cli.py", line 959, in 
> worker
> worker.run(**options)
>   File "/usr/lib/python2.7/site-packages/celery/bin/worker.py", line 257, in 
> run
> **kwargs)
>   File "/usr/lib/python2.7/site-packages/celery/worker/worker.py", line 101, 
> in __init__
> self.setup_instance(**self.prepare_args(**kwargs))
>   File "/usr/lib/python2.7/site-packages/celery/worker/worker.py", line 124, 
> in setup_instance
> self.should_use_eventloop() if use_eventloop is None
>   File "/usr/lib/python2.7/site-packages/celery/worker/worker.py", line 243, 
> in should_use_eve
> self._conninfo.transport.implements.async and
>   File "/usr/lib/python2.7/site-packages/kombu/transport/base.py", line 125, 
> in __getattr__
> raise AttributeError(key)
> AttributeError: async
> 
> 
> 
> I reported this as a bug in celery here:
> 
> https://github.com/celery/celery/issues/4747
> 
> It turns out that this is a known problem.  It looks like in kombu, they
> renamed a bunch of uses of "async" to "asynchronous", to not clash with the 
> new
> 'async' keyword in Python 3.7.
> 
> In celery, they are catching up and doing the same rename.  
> Almost at the same time that I reported this bug, the issue was fixed in the 
> master branch of celery:
> 
> https://github.com/celery/celery/commit/c8ef7ad60b72a194654c58beb04a1d65cd0435ad
> 
> Hopefully the versions of kombu and celery will stabilize so that it all 
> works properly with airflow.
> 
> --
> Craig
> 
> 
> 


Dockerised CI and testing environment

2018-05-21 Thread Gerardo Curiel
Hello folks,

I just submitted a PR for using Docker as part of Airflow's build pipeline:
https://github.com/apache/incubator-airflow/pull/3393

Currently, running unit tests is a difficult process. Airflow tests depend
on many external services and other custom setup, which makes it hard for
contributors to work on this codebase. CI builds have also been unreliable,
and it is hard to reproduce the causes. Having contributors trying to
emulate the build environment every time makes it easier to get to an "it
works on my machine" sort of situation. The proposed docker setup aims to
simplify this.

You can check the PR description, which goes into more details. Now, I
bring this to the list because I have a few requests:

- Could you try the branch out on your local machines? The instructions are
provided in the PR (pending: actual docs). I would love to get feedback
about it.
- Is anyone familiar with the impersonation tests? There have proven to be
hard to fix with my limited knowledge of the codebase

It's WIP, but I wanted to submit what I've got so far and check if you guys
think I'm going in the right direction.

Cheers,

-- 
Gerardo Curiel // https://gerar.do


Dags getting failed after 24 hours

2018-05-21 Thread ramandumcs
Hi All,
We have a long running DAG which is expected to take around 48 hours. But we 
are observing that its get killed by Airflow scheduler after ~24 hrs. We are 
not setting any Dag/task execution timeout explicitly.
Is there any default timeout value that get used. We are using LocalExecutor 
mode.
We checked in the Airflow code but execution timeout values seem to be set to 
'None'

Thanks,
Raman Gupta


celery problem: cannot override celery_broker_transport_options

2018-05-21 Thread Craig Rodrigues
Hi,

I used this requirements.txt file to install airflow from the v1-10-test branch:

git+https://github.com/celery/celery@master#egg=celery
git+https://github.com/apache/incubator-airflow@v1-10-test#egg=apache-airflow[celery,crypto,emr,hive,hdfs,ldap,mysql,postgres,redis,slack,s3]
kombu>=4.1.0


In my airflow.cfg, I have:

[celery]
executor = CeleryExecutor

executor = CeleryExec
broker_url = sqla+mysql://airflow:blah@localhost:3306/mydb

[celery_broker_transport_options]
#
#

However, if I manually run this code inside the webserver, I see:

python -c "from airflow import configuration; c = 
configuration.conf.getsection('celery_broker_transport_options'); print(c)"
OrderedDict([(u'visibility_timeout', 21600), (u'ssl_active', False), 
(u'ssl_key', u''), (u'ssl_cert', u''), (u'ssl_cacert', u'')])

My worker crashes with this error:


[2018-05-21 07:46:12,406] {configuration.py:212} WARNING - section/key 
[celery/ssl_active] not found in config
[2018-05-21 07:46:12,407] {default_celery.py:51} WARNING - Celery Executor will 
run without SSL
[2018-05-21 07:46:12,411] {__init__.py:48} INFO - Using executor CeleryExecutor
[2018-05-21 07:46:13,086: CRITICAL/MainProcess] Unrecoverable error: 
TypeError(u"Invalid argument(s) 
'ssl_key','ssl_cert','ssl_active','visibility_timeout','ssl_cacert' sent to 
create_engine(), using configuration MySQLDialect_mysqldb/QueuePool/Engine.  
Please check that the keyword arguments are appropriate for this combination of 
components.",)
Traceback (most recent call last):
  File "/usr/lib/python2.7/site-packages/celery/worker/worker.py", line 205, in 
start
self.blueprint.start(self)
  File "/usr/lib/python2.7/site-packages/celery/bootsteps.py", line 119, in 
start
step.start(parent)
  File "/usr/lib/python2.7/site-packages/celery/bootsteps.py", line 369, in 
start
return self.obj.start()
  File "/usr/lib/python2.7/site-packages/celery/worker/consumer/consumer.py", 
line 322, in start
blueprint.start(self)
  File "/usr/lib/python2.7/site-packages/celery/bootsteps.py", line 119, in 
start
step.start(parent)
  File "/usr/lib/python2.7/site-packages/celery/worker/consumer/tasks.py", line 
41, in start
c.connection, on_decode_error=c.on_decode_error,
  File "/usr/lib/python2.7/site-packages/celery/app/amqp.py", line 297, in 
TaskConsumer
**kw
  File "/usr/lib/python2.7/site-packages/kombu/messaging.py", line 386, in 
__init__
self.revive(self.channel)
  File "/usr/lib/python2.7/site-packages/kombu/messaging.py", line 408, in 
revive
self.declare()
  File "/usr/lib/python2.7/site-packages/kombu/messaging.py", line 421, in 
declare
queue.declare()
  File "/usr/lib/python2.7/site-packages/kombu/entity.py", line 605, in declare
self._create_queue(nowait=nowait, channel=channel)
  File "/usr/lib/python2.7/site-packages/kombu/entity.py", line 614, in 
_create_queue
self.queue_declare(nowait=nowait, passive=False, channel=channel)
  File "/usr/lib/python2.7/site-packages/kombu/entity.py", line 649, in 
queue_declare
nowait=nowait,
  File "/usr/lib/python2.7/site-packages/kombu/transport/virtual/base.py", line 
531, in queue_declare
self._new_queue(queue, **kwargs)
  File 
"/usr/lib/python2.7/site-packages/kombu/transport/sqlalchemy/__init__.py", line 
82, in _new_queue
self._get_or_create(queue)
  File 
"/usr/lib/python2.7/site-packages/kombu/transport/sqlalchemy/__init__.py", line 
70, in _get_or_create
obj = self.session.query(self.queue_cls) \
  File 
"/usr/lib/python2.7/site-packages/kombu/transport/sqlalchemy/__init__.py", line 
65, in session
_, Session = self._open()
  File 
"/usr/lib/python2.7/site-packages/kombu/transport/sqlalchemy/__init__.py", line 
56, in _open
engine = self._engine_from_config()
  File 
"/usr/lib/python2.7/site-packages/kombu/transport/sqlalchemy/__init__.py", line 
51, in _engine_from_config
return create_engine(conninfo.hostname, **transport_options)
  File "/usr/lib64/python2.7/site-packages/sqlalchemy/engine/__init__.py", line 
391, in create_engine
return strategy.create(*args, **kwargs)
  File "/usr/lib64/python2.7/site-packages/sqlalchemy/engine/strategies.py", 
line 160, in create
engineclass.__name__))
TypeError: Invalid argument(s) 
'ssl_key','ssl_cert','ssl_active','visibility_timeout','ssl_cacert' sent to 
create_engine(), using configuration MySQLDialect_mysqldb/QueuePool/Engine.  
Please check that the keyword arguments are appropriate for this combination of 
components.
 
 -- celery@qa1 v4.2.0rc3 (windowlicker)
  - 
--- * ***  * -- Linux-4.13.0-16-generic-x86_64-with-centos-7.3.1611-Core 
2018-05-21 07:46:12
-- * -  --- 
- ** -- [config]
- ** -- .> app: airflow.executors.celery_executor:0x4766d50
- ** -- .> transport:   sqla+mysql://airflow:blah@localhost:3306/mydb
- ** -- .> results: mysql://airflow:**@localhost:3306/airflow
- *** --- * --- .> concurrency: 16 (prefork)
-- ***  .> 

celery problem, AttributeError: async

2018-05-21 Thread Craig Rodrigues
Hi,

I used a requiremens.txt file with these three lines:

git+https://github.com/apache/incubator-airflow@v1-10-test#egg=apache-airflow[celery,crypto,emr,hive,hdfs,ldap,mysql,postgres,redis,slack,s3]
celery>=4.2.0rc3
kombu>=4.2.0


I did

pip install -r requirements.txt

When I started my worker, I got:

[2018-05-21 06:39:23,980] {__init__.py:48} INFO - Using executor CeleryExecutor
Traceback (most recent call last):
  File "/bin/airflow", line 32, in 
args.func(args)
  File "/usr/lib/python2.7/site-packages/airflow/utils/cli.py", line 74, in 
wrapper
return f(*args, **kwargs)
  File "/usr/lib/python2.7/site-packages/airflow/bin/cli.py", line 959, in 
worker
worker.run(**options)
  File "/usr/lib/python2.7/site-packages/celery/bin/worker.py", line 257, in run
**kwargs)
  File "/usr/lib/python2.7/site-packages/celery/worker/worker.py", line 101, in 
__init__
self.setup_instance(**self.prepare_args(**kwargs))
  File "/usr/lib/python2.7/site-packages/celery/worker/worker.py", line 124, in 
setup_instance
self.should_use_eventloop() if use_eventloop is None
  File "/usr/lib/python2.7/site-packages/celery/worker/worker.py", line 243, in 
should_use_eve
self._conninfo.transport.implements.async and
  File "/usr/lib/python2.7/site-packages/kombu/transport/base.py", line 125, in 
__getattr__
raise AttributeError(key)
AttributeError: async



I reported this as a bug in celery here:

https://github.com/celery/celery/issues/4747

It turns out that this is a known problem.  It looks like in kombu, they
renamed a bunch of uses of "async" to "asynchronous", to not clash with the new
'async' keyword in Python 3.7.

In celery, they are catching up and doing the same rename.  
Almost at the same time that I reported this bug, the issue was fixed in the 
master branch of celery:

https://github.com/celery/celery/commit/c8ef7ad60b72a194654c58beb04a1d65cd0435ad

Hopefully the versions of kombu and celery will stabilize so that it all works 
properly with airflow.

--
Craig