Re: Airflow with Celery

2018-05-15 Thread David Capwell
What I find is that when celery rejects we hit this.  For us we don't do
work on the hosts so solve by over provisioning tasks in celery

On Tue, May 15, 2018, 6:30 AM Andy Cooper  wrote:

> I have had very similar issues when there was a problem with the connection
> string pointing to the message broker. Triple check those connection
> strings and attempt to connect outside of airflow.
>
> On Tue, May 15, 2018 at 9:27 AM Goutham Pratapa 
> wrote:
>
> > Hi all,
> >
> > I have been using airflow with Celery executor in the background
> >
> > https://hastebin.com/sipecovomi.ini --> airflow.cfg
> >
> > https://hastebin.com/urutokuvoq.py   --> The dag I have been using
> >
> >
> >
> > This shows that the dag is always in running state.
> >
> >
> >
> >
> > Airflow flower shows nothing in the tasks or in the broker.
> >
> >
> > Did I miss anything can anyone help me in this regard.
> >
> >
> > --
> > Cheers !!!
> > Goutham Pratapa
> >
>


Re: Improving Airflow SLAs

2018-05-02 Thread David Capwell
We use SLA as well and works great for some DAGs and painful for others

We rely on sensors to validate the data is ready before we run and each dag
waits on sensors for different times (one dag waits for 8 hours since it
expects date at the start of day but tends to get it 8 hours later).  We
also have some nested dags that have about 10 tasks deep.

In these two cases SLA warnings come very late since the semantics we see
is DAG completion time; what we really want is what you were talking about,
expected execution times

Also SLA trigger on backfills and manual reruns of tasks

I see this as a critical feature for production monitoring so would love to
see this get improved

On Wed, May 2, 2018, 12:00 PM James Meickle  wrote:

> At Quantopian we use Airflow to produce artifacts based on the previous
> day's stock market data. These artifacts are required for us to trade on
> today's stock market. Therefore, I've been investing time in improving
> Airflow notifications (such as writing PagerDuty and Slack integrations).
> My attention has turned to Airflow's SLA system, which has some drawbacks
> for our use case:
>
> 1) Airflow SLAs are not skip-aware, so a task that has an SLA but is
> skipped for this execution date will still trigger emails/callbacks. This
> is a huge problem for us because we run almost no tasks on weekends (since
> the stock market isn't open).
>
> 2) Defining SLAs can be awkward because they are relative to the execution
> date instead of the task start time. There's no way to alert if a task runs
> for "more than an hour", for any non-trivial DAG. Instead you can only
> express "more than an hour from execution date".  The financial data we use
> varies in when it arrives, and how long it takes to process (data volume
> changes frequently); we also have tight timelines that make retries
> difficult, so we want to alert an operator while leaving the task running,
> rather than failing and then alerting.
>
> 3) SLA miss emails don't have a subject line containing the instance URL
> (important for us because we run the same DAGs in both staging/production)
> or the execution date they apply to. When opened, they can get hard to read
> for even a moderately sized DAG because they include a flat list of task
> instances that are unsorted (neither alpha nor topo). They are also lacking
> any links back to the Airflow instance.
>
> 4) SLA emails are not callbacks, and can't be turned off (other than either
> removing the SLA or removing the email attribute on the task instance). The
> way that SLA miss callbacks are defined is not intuitive, as in contrast to
> all other callbacks, they are DAG-level rather than task-level. Also, the
> call signature is poorly defined: for instance, two of the arguments are
> just strings produced from the other two arguments.
>
> I have some thoughts about ways to fix these issues:
>
> 1) I just consider this one a bug. If a task instance is skipped, that was
> intentional, and it should not trigger any alerts.
>
> 2) I think that the `sla=` parameter should be split into something like
> this:
>
> `expected_start`: Timedelta after execution date, representing when this
> task must have started by.
> `expected_finish`: Timedelta after execution date, representing when this
> task must have finished by.
> `expected_duration`: Timedelta after task start, representing how long it
> is expected to run including all retries.
>
> This would give better operator control over SLAs, particularly for tasks
> deeper in larger DAGs where exact ordering may be hard to predict.
>
> 3) The emails should be improved to be more operator-friendly, and take
> into account that someone may get a callback for a DAG they don't know very
> well, or be paged by this notification.
>
> 4.1) All Airflow callbacks should support a list, rather than requiring a
> single function. (I've written a wrapper that does this, but it would be
> better for Airflow to just handle this itself.)
>
> 4.2) SLA miss callbacks should be task callbacks that receive context, like
> all the other callbacks. Having a DAG figure out which tasks have missed
> SLAs collectively is fine, but getting SLA failures in a batched callback
> doesn't really make much sense. Per-task callbacks can be fired
> individually within a batch of failures detected at the same time.
>
> 4.3) SLA emails should be the default SLA miss callback function, rather
> than being hardcoded.
>
> Also, overall, the SLA miss logic is very complicated. It's stuffed into
> one overloaded function that is responsible for checking for SLA misses,
> creating database objects for them, filtering tasks, selecting emails,
> rendering, and sending. Refactoring it would be a good maintainability win.
>
> I am already implementing some of the above in a private branch, but I'd be
> curious to hear community feedback as to which of these suggestions might
> be desirable upstream. I could have this ready for Airflow 2.0 if there is
> interest be

AIRFLOW-1157 and 1.9.1

2018-04-04 Thread David Capwell
So was bitten by this and found the jira which says it's resolved in 1.9.1
but I don't see the commit in v1-9-stable or test; what is the correct
branch for 1.9 fixes?

Thanks!


Re: schedule backfill jobs in reverse order

2018-04-02 Thread David Capwell
Nothing I know of.  The scheduler finds the latest execution then creates
the next based off interval; this is also why update to start date have no
affect (doesn't try to fill gaps)

On Mon, Apr 2, 2018, 11:26 AM Dennis O'Brien 
wrote:

> Hi folks,
>
> I recently asked this question on gitter but didn't get any feedback.
>
> Anyone know if there is a way to get the scheduler to reverse the order of
> the dag runs? By default a new DAG starts at start_date then moves
> sequentially forward in time until it is caught up (assuming catchup=True).
> The same is true for a new DAG just enabled, or a DAG that is cleared, and
> for a backfill.
>
> The behavior I'd like to get is for the scheduler to queue up the latest
> available, so it starts most recent, then moves back in time. If while the
> backfill is running a more recent DAG run is eligible, that one should be
> queued next.
>
> Is there anyway to accomplish this?  Is this a feature that others would
> find useful?
>
> For some background, I have some jobs that make predictions and do a long
> backfill for historical backtesting, and that can mean no new predictions
> for a week depending on the job and the time to backfill.  Ideally the most
> recent jobs would take precedence over the historical jobs.
>
> thanks,
> Dennis
>


Re: Submitting 1000+ tasks to airflow programatically

2018-03-22 Thread David Capwell
For us we compile down to Python rather than do the logic in Python, that
makes it so the load doesn't do real work.

We have our own DSL that is just a simplified compiler; parse, analyze,
optimize, code gen.  In code gen we just generate the Python code.  Our
build then packages it up and have airflow fetch it (very hacky fetch right
now)

This does make it so loading is simple and fast, but means you can't use
the Python api directly

On Thu, Mar 22, 2018, 12:43 AM Andrew Maguire  wrote:

> I've had similar issues with large dags being slow to render on ui and
> crashing chrome.
>
> I got around it by changing the default tree view from 25 to just 5.
>
> Involves a couple changes to source files though, would be great if some of
> the ui defaults could go into airflow.cfg.
>
> https://stackoverflow.com/a/48665734/1919374
>
> On Thu, 22 Mar 2018, 01:26 Chris Fei,  wrote:
>
> > @Kyle, I do something similar and have run into the problems you've
> > mentioned. In my case, I access data from S3 and then generate separate
> > DAGs (of different structures) based on the data that's pulled. I've
> > also found that the UI for accessing a single large DAG is slow so I
> > prefer to keep many separate DAGs. What I'd try is to define a DAG
> > that's responsible for accessing your API and caching the client IDs
> > somewhere locally, maybe just to a file on disk or as an Airflow
> > Variable. You can run this DAG on whatever schedule is appropriate for
> > you. From there, build a function that creates a DAG and then for each
> > client ID, register a DAG built by that function to the global context.
> > Like this:
> > def create_client_dag(client_id):
> > # build dag here
> >
> > def get_client_ids_locally():
> > # access the data that was pulled from the API
> >
> > client_ids = get_client_ids_locally()
> > for client in client_ids:
> > dag = create_client_dag(client)
> > globals()[dag.dag_id] = dag
> >
> > This approach also handles removing client IDs somewhat gracefully. DAGs
> > for removed clients will still appear in the UI (you can build a
> > maintenance DAG to clean that up), but they'll be disabled and their
> > tasks won't be scheduled.
> > On Wed, Mar 21, 2018, at 7:32 PM, Kyle Hamlin wrote:
> > > Thanks for all the responses let me try to address the main themes.
> > >
> > > @Ace @Nicholas @Taylor
> > > I originally started with a loop over my list of client ids and
> > > created a> SparkSubmitOperator for each client. The pseudo code would
> > look
> > > something> like this:
> > >
> > > dag = DAG(...)
> > >
> > > client_ids = get_client_ids()
> > > for client_id in client_ids:
> > > SparkSubmitOperator(
> > > ...
> > > dag=dag
> > > )
> > >
> > > I found that this approach kind of clunky for a few reasons.
> > > First, the> get_cleint_ids() function was hitting our API every time
> the
> > dag
> > > was read> by the scheduler which seemed excessive (every 30 seconds or
> > > so?). Second,> it seemed like when a single task failure made marked
> the
> > whole
> > > dag as a> failure, but I guess retrying till the task worked could
> solve
> > > this? Third,> the UI gets really clunky and slow, basically unusable
> > when it
> > > tries to> render the graph view for that many tasks. Finally, Airflow
> > > doesn't seem> very happy when client_ids are removed i.e. the
> > get_client_ids()
> > > no longer> returns a specific client_id, it really seems to want a
> > static dag.
> > >
> > > Do I really have to poll and API or database every 30 seconds for this>
> > dynamic client_id data?
> > >
> > > @Ace
> > > I have been limiting concurrency so as to not blast the cluster
> > >
> > > @Nicholas
> > > Thank you for the noise suggestion I will definitely implement
> > > that if I> continue with the same methodology
> > >
> > > @Taylor
> > > Are you using a SubDagOperator? Or is your process similar to the
> > > pseudo code I wrote above?
> > >
> > >
> > > On Wed, Mar 21, 2018 at 2:49 PM Taylor Edmiston
> > >  wrote:>
> > >> We also use a similar approach to generate dynamic DAGs based on
> > >> a common>> template DAG file.  We pull in the list of config objects,
> > one
> > >> per DAG,>> from an internal API lightly wrapping the database, then we
> > >> cache that>> response in a Airflow Variable that gets updated once a
> > minute.  The>> dynamic DAGs are generated from that variable.
> > >>
> > >> *Taylor Edmiston*
> > >> TEdmiston.com  | Blog
> > >> 
> > >> Stack Overflow CV  |
> > LinkedIn>>  | AngelList
> > >> 
> > >>
> > >>
> > >> On Wed, Mar 21, 2018 at 1:54 PM, Nicolas Kijak <
> > >> nicolas.ki...@weightwatchers.com> wrote:
> > >>
> > >>> Kyle,
> > >>>
> > >>> We have a similar approach but on a much, much smaller scale. We
> > >>> now have>>> <100 “things to process” but expect it to grow to under

Re: Running dag run doesn't schedule task

2018-03-19 Thread David Capwell
Current theory is priority, this dag is high fanout where as other DAGs are
much deeper.  Looking at the scheduler code and database this looks hard to
prove since I only see logs for success and DB doesn't disguise between
runnable and not scheduled; is there a good way to check schedule delay?

On Mon, Mar 19, 2018, 6:15 PM David Capwell  wrote:

> Ignore that, must be something with splunk since stdiut doesn't have a
> date field; the same process writing to a file is printing that out and
> Filling is before that line...
>
> On Mon, Mar 19, 2018, 5:35 PM David Capwell  wrote:
>
>> This is weird and hope not bad utc conversion tricking me
>>
>>
>> So splunk logs for worker shows the process logs were created at 9am
>> ("Logging into: "), the first entry of the log was at 14:00 ("Filling
>> up the DagBag").  If I go to the DB and calculate queue time this specific
>> dag was delayed 5 hours which matches the logs...
>>
>>
>>
>> On Mon, Mar 19, 2018, 9:10 AM David Capwell  wrote:
>>
>>> The major reason we have been waiting was mostly because 1.8.2 and 1.9
>>> are backwards incompatible (don't remember off the top of my head but one
>>> operator broke important so everything failed for us), so neglected doing
>>> the work to support both versions (need to support both since different
>>> teams move at different rates).
>>>
>>> We need to do this anyways (frozen in time is very bad).
>>>
>>> On Mon, Mar 19, 2018, 1:47 AM Driesprong, Fokko 
>>> wrote:
>>>
>>>> Hi David,
>>>>
>>>> First I would update to Apache Airflow 1.9.0, there have been a lot of
>>>> fixes between 1.8.2 and 1.9.0. Just to see if the bug is still in there.
>>>>
>>>> Cheers, Fokko
>>>>
>>>> 2018-03-18 19:41 GMT+01:00 David Capwell :
>>>>
>>>> > Thanks for the reply
>>>> >
>>>> > Our script doesn't set it so should be off; the process does not
>>>> normally
>>>> > restart (monitoring has a counter for number of restarts since deploy,
>>>> > currently as 0)
>>>> >
>>>> > At the point in time the UI showed the upstream tasks as green
>>>> (success);
>>>> > we manually ran tasks so no longer in the same state, so can't check
>>>> UI
>>>> > right now
>>>> >
>>>> > On Sun, Mar 18, 2018, 11:34 AM Bolke de Bruin 
>>>> wrote:
>>>> >
>>>> > > Are you running with num_runs? If so disable it. We have seen this
>>>> > > behavior with num_runs. Also you can find out by clicking on the
>>>> task if
>>>> > > there is a dependency issue.
>>>> > >
>>>> > > B.
>>>> > >
>>>> > > Verstuurd vanaf mijn iPad
>>>> > >
>>>> > > > Op 18 mrt. 2018 om 19:08 heeft David Capwell 
>>>> het
>>>> > > volgende geschreven:
>>>> > > >
>>>> > > > We just started seeing this a few days ago after turning on SLA
>>>> for our
>>>> > > > tasks (not saying SLA did this, may have been happening before
>>>> and not
>>>> > > > noticing), but we have a dag that runs once a hour and we see
>>>> that 4-5
>>>> > > dag
>>>> > > > runs are marked running but tasks are not getting scheduled.
>>>> When we
>>>> > get
>>>> > > > the SLA alert the action we are doing right now is going to the
>>>> UI and
>>>> > > > clicking run on tasks manually; this is only needed for the
>>>> oldest dag
>>>> > > run
>>>> > > > and the rest recover after that. In the past 3 days this has
>>>> happened
>>>> > > twice
>>>> > > > to us.
>>>> > > >
>>>> > > > We are running 1.8.2, are there any known jira about this? Don't
>>>> know
>>>> > > > scheduler well, what could I do to see why these tasks are getting
>>>> > > skipped
>>>> > > > without manual intervention?
>>>> > > >
>>>> > > > Thanks for your time.
>>>> > >
>>>> >
>>>>
>>>


Re: Running dag run doesn't schedule task

2018-03-19 Thread David Capwell
Ignore that, must be something with splunk since stdiut doesn't have a date
field; the same process writing to a file is printing that out and Filling
is before that line...

On Mon, Mar 19, 2018, 5:35 PM David Capwell  wrote:

> This is weird and hope not bad utc conversion tricking me
>
>
> So splunk logs for worker shows the process logs were created at 9am
> ("Logging into: "), the first entry of the log was at 14:00 ("Filling
> up the DagBag").  If I go to the DB and calculate queue time this specific
> dag was delayed 5 hours which matches the logs...
>
>
>
> On Mon, Mar 19, 2018, 9:10 AM David Capwell  wrote:
>
>> The major reason we have been waiting was mostly because 1.8.2 and 1.9
>> are backwards incompatible (don't remember off the top of my head but one
>> operator broke important so everything failed for us), so neglected doing
>> the work to support both versions (need to support both since different
>> teams move at different rates).
>>
>> We need to do this anyways (frozen in time is very bad).
>>
>> On Mon, Mar 19, 2018, 1:47 AM Driesprong, Fokko 
>> wrote:
>>
>>> Hi David,
>>>
>>> First I would update to Apache Airflow 1.9.0, there have been a lot of
>>> fixes between 1.8.2 and 1.9.0. Just to see if the bug is still in there.
>>>
>>> Cheers, Fokko
>>>
>>> 2018-03-18 19:41 GMT+01:00 David Capwell :
>>>
>>> > Thanks for the reply
>>> >
>>> > Our script doesn't set it so should be off; the process does not
>>> normally
>>> > restart (monitoring has a counter for number of restarts since deploy,
>>> > currently as 0)
>>> >
>>> > At the point in time the UI showed the upstream tasks as green
>>> (success);
>>> > we manually ran tasks so no longer in the same state, so can't check UI
>>> > right now
>>> >
>>> > On Sun, Mar 18, 2018, 11:34 AM Bolke de Bruin 
>>> wrote:
>>> >
>>> > > Are you running with num_runs? If so disable it. We have seen this
>>> > > behavior with num_runs. Also you can find out by clicking on the
>>> task if
>>> > > there is a dependency issue.
>>> > >
>>> > > B.
>>> > >
>>> > > Verstuurd vanaf mijn iPad
>>> > >
>>> > > > Op 18 mrt. 2018 om 19:08 heeft David Capwell 
>>> het
>>> > > volgende geschreven:
>>> > > >
>>> > > > We just started seeing this a few days ago after turning on SLA
>>> for our
>>> > > > tasks (not saying SLA did this, may have been happening before and
>>> not
>>> > > > noticing), but we have a dag that runs once a hour and we see that
>>> 4-5
>>> > > dag
>>> > > > runs are marked running but tasks are not getting scheduled.  When
>>> we
>>> > get
>>> > > > the SLA alert the action we are doing right now is going to the UI
>>> and
>>> > > > clicking run on tasks manually; this is only needed for the oldest
>>> dag
>>> > > run
>>> > > > and the rest recover after that. In the past 3 days this has
>>> happened
>>> > > twice
>>> > > > to us.
>>> > > >
>>> > > > We are running 1.8.2, are there any known jira about this? Don't
>>> know
>>> > > > scheduler well, what could I do to see why these tasks are getting
>>> > > skipped
>>> > > > without manual intervention?
>>> > > >
>>> > > > Thanks for your time.
>>> > >
>>> >
>>>
>>


Re: Running dag run doesn't schedule task

2018-03-19 Thread David Capwell
This is weird and hope not bad utc conversion tricking me


So splunk logs for worker shows the process logs were created at 9am
("Logging into: "), the first entry of the log was at 14:00 ("Filling
up the DagBag").  If I go to the DB and calculate queue time this specific
dag was delayed 5 hours which matches the logs...



On Mon, Mar 19, 2018, 9:10 AM David Capwell  wrote:

> The major reason we have been waiting was mostly because 1.8.2 and 1.9 are
> backwards incompatible (don't remember off the top of my head but one
> operator broke important so everything failed for us), so neglected doing
> the work to support both versions (need to support both since different
> teams move at different rates).
>
> We need to do this anyways (frozen in time is very bad).
>
> On Mon, Mar 19, 2018, 1:47 AM Driesprong, Fokko 
> wrote:
>
>> Hi David,
>>
>> First I would update to Apache Airflow 1.9.0, there have been a lot of
>> fixes between 1.8.2 and 1.9.0. Just to see if the bug is still in there.
>>
>> Cheers, Fokko
>>
>> 2018-03-18 19:41 GMT+01:00 David Capwell :
>>
>> > Thanks for the reply
>> >
>> > Our script doesn't set it so should be off; the process does not
>> normally
>> > restart (monitoring has a counter for number of restarts since deploy,
>> > currently as 0)
>> >
>> > At the point in time the UI showed the upstream tasks as green
>> (success);
>> > we manually ran tasks so no longer in the same state, so can't check UI
>> > right now
>> >
>> > On Sun, Mar 18, 2018, 11:34 AM Bolke de Bruin 
>> wrote:
>> >
>> > > Are you running with num_runs? If so disable it. We have seen this
>> > > behavior with num_runs. Also you can find out by clicking on the task
>> if
>> > > there is a dependency issue.
>> > >
>> > > B.
>> > >
>> > > Verstuurd vanaf mijn iPad
>> > >
>> > > > Op 18 mrt. 2018 om 19:08 heeft David Capwell 
>> het
>> > > volgende geschreven:
>> > > >
>> > > > We just started seeing this a few days ago after turning on SLA for
>> our
>> > > > tasks (not saying SLA did this, may have been happening before and
>> not
>> > > > noticing), but we have a dag that runs once a hour and we see that
>> 4-5
>> > > dag
>> > > > runs are marked running but tasks are not getting scheduled.  When
>> we
>> > get
>> > > > the SLA alert the action we are doing right now is going to the UI
>> and
>> > > > clicking run on tasks manually; this is only needed for the oldest
>> dag
>> > > run
>> > > > and the rest recover after that. In the past 3 days this has
>> happened
>> > > twice
>> > > > to us.
>> > > >
>> > > > We are running 1.8.2, are there any known jira about this? Don't
>> know
>> > > > scheduler well, what could I do to see why these tasks are getting
>> > > skipped
>> > > > without manual intervention?
>> > > >
>> > > > Thanks for your time.
>> > >
>> >
>>
>


Re: Running dag run doesn't schedule task

2018-03-19 Thread David Capwell
The major reason we have been waiting was mostly because 1.8.2 and 1.9 are
backwards incompatible (don't remember off the top of my head but one
operator broke important so everything failed for us), so neglected doing
the work to support both versions (need to support both since different
teams move at different rates).

We need to do this anyways (frozen in time is very bad).

On Mon, Mar 19, 2018, 1:47 AM Driesprong, Fokko 
wrote:

> Hi David,
>
> First I would update to Apache Airflow 1.9.0, there have been a lot of
> fixes between 1.8.2 and 1.9.0. Just to see if the bug is still in there.
>
> Cheers, Fokko
>
> 2018-03-18 19:41 GMT+01:00 David Capwell :
>
> > Thanks for the reply
> >
> > Our script doesn't set it so should be off; the process does not normally
> > restart (monitoring has a counter for number of restarts since deploy,
> > currently as 0)
> >
> > At the point in time the UI showed the upstream tasks as green (success);
> > we manually ran tasks so no longer in the same state, so can't check UI
> > right now
> >
> > On Sun, Mar 18, 2018, 11:34 AM Bolke de Bruin  wrote:
> >
> > > Are you running with num_runs? If so disable it. We have seen this
> > > behavior with num_runs. Also you can find out by clicking on the task
> if
> > > there is a dependency issue.
> > >
> > > B.
> > >
> > > Verstuurd vanaf mijn iPad
> > >
> > > > Op 18 mrt. 2018 om 19:08 heeft David Capwell 
> het
> > > volgende geschreven:
> > > >
> > > > We just started seeing this a few days ago after turning on SLA for
> our
> > > > tasks (not saying SLA did this, may have been happening before and
> not
> > > > noticing), but we have a dag that runs once a hour and we see that
> 4-5
> > > dag
> > > > runs are marked running but tasks are not getting scheduled.  When we
> > get
> > > > the SLA alert the action we are doing right now is going to the UI
> and
> > > > clicking run on tasks manually; this is only needed for the oldest
> dag
> > > run
> > > > and the rest recover after that. In the past 3 days this has happened
> > > twice
> > > > to us.
> > > >
> > > > We are running 1.8.2, are there any known jira about this? Don't know
> > > > scheduler well, what could I do to see why these tasks are getting
> > > skipped
> > > > without manual intervention?
> > > >
> > > > Thanks for your time.
> > >
> >
>


Re: Running dag run doesn't schedule task

2018-03-18 Thread David Capwell
Thanks for the reply

Our script doesn't set it so should be off; the process does not normally
restart (monitoring has a counter for number of restarts since deploy,
currently as 0)

At the point in time the UI showed the upstream tasks as green (success);
we manually ran tasks so no longer in the same state, so can't check UI
right now

On Sun, Mar 18, 2018, 11:34 AM Bolke de Bruin  wrote:

> Are you running with num_runs? If so disable it. We have seen this
> behavior with num_runs. Also you can find out by clicking on the task if
> there is a dependency issue.
>
> B.
>
> Verstuurd vanaf mijn iPad
>
> > Op 18 mrt. 2018 om 19:08 heeft David Capwell  het
> volgende geschreven:
> >
> > We just started seeing this a few days ago after turning on SLA for our
> > tasks (not saying SLA did this, may have been happening before and not
> > noticing), but we have a dag that runs once a hour and we see that 4-5
> dag
> > runs are marked running but tasks are not getting scheduled.  When we get
> > the SLA alert the action we are doing right now is going to the UI and
> > clicking run on tasks manually; this is only needed for the oldest dag
> run
> > and the rest recover after that. In the past 3 days this has happened
> twice
> > to us.
> >
> > We are running 1.8.2, are there any known jira about this? Don't know
> > scheduler well, what could I do to see why these tasks are getting
> skipped
> > without manual intervention?
> >
> > Thanks for your time.
>


Running dag run doesn't schedule task

2018-03-18 Thread David Capwell
We just started seeing this a few days ago after turning on SLA for our
tasks (not saying SLA did this, may have been happening before and not
noticing), but we have a dag that runs once a hour and we see that 4-5 dag
runs are marked running but tasks are not getting scheduled.  When we get
the SLA alert the action we are doing right now is going to the UI and
clicking run on tasks manually; this is only needed for the oldest dag run
and the rest recover after that. In the past 3 days this has happened twice
to us.

We are running 1.8.2, are there any known jira about this? Don't know
scheduler well, what could I do to see why these tasks are getting skipped
without manual intervention?

Thanks for your time.


Re: How to add hooks for strong deployment consistency?

2018-03-01 Thread David Capwell
We need two versions but most likely would not use either... That being
artifactory and git (would really love for this to be pluggable!)

We have our own dag fetch logic which right now pulls from git, caches,
then redirect airflow to that directory.  For us we have airflow automated
so you push a button to get a cluster, for this reason there are enough
instances that we have DDOS attacked git (opps).

We are planning to change this to fetch from artifactory, and have a
stateful proxy for each cluster so we stop DDOS attacking core
infrastructure.

On Mar 1, 2018 11:45 AM, "William Wong"  wrote:

Also relatively new to Airflow here. Same as David above, Option 1 is not
an option for us either for the same reasons.

What I would like to see is that it can be user selectable / modifiable.

Use Case:
We have a DAG with thousands of task dependencies/tasks. After 24hrs of
progressing, we need to take a subset of those tasks and rerun them with a
different configuration (reasons range from incorrect parameters to
infrastructure issues, doesn't really matter here).

What I hope can happen:
1. Pause DAG
2. Upload and tag newest dag version
3. Set dag_run to use latest tag,
4. Resolve DAG sync using 
5. Unpause DAG

I do like the DagFetcher idea. This logic should shim in nicely in the
DagBag code. Maxime, I also vote for the GitDagFetcher. Two thoughts about
the GitDagFetcher:
- I probably won't use fuse across 100's of nodes in my k8s/swarm. Not sure
how this would work without too much trouble.
- It might be confusing if some git sha's have no changes to a Dag. all
existing runs will be marked as outdated? probably better than nothing
anyway.

I also vote to have some form of sort of caching behavior. I prefer not to
read in DAGs all the time. i.e. from the webserver, scheduler, *and* all
workers before starting any task over and over again. This is because,
unfortunately, the assumption that a DAG only takes seconds to load does
not hold true for large dags. With only 10k tasks within a DAG it's already
on the order of minutes. This would be untenable as we scale up to even
larger tags. (Though, I'm testing a fix for this so maybe this might not
actually be an issue anymore)

FWIW, it seems to me that the DagPickle feature (which, for the love me I
can't seem to get it to work, no wonder it's being deprecated) would have
solved a lot of these issues fairly easily. Something along the lines of
adding pickle_id to dag_run should at  help the scheduler identify the DAG
version to load and queue. But I'm not sure if it can delete out of sync
task instances.

Lastly, sorry for the brain dump and derailing the topic, for the workers,
it seems that importing/loading in the DAG just to execute a single task is
a bit overkill isn't it? If we kept a caching feature (i.e. pickling),
perhaps we can simply cache the task and not worry about the rest of the
DAG tasks?

Will

On Thu, Mar 1, 2018 at 11:30 AM, Maxime Beauchemin <

maximebeauche...@gmail.com> wrote:

> I'm curious to hear which DagFetcher abstraction people would build or
want
> to use.
>
> So far it sounded like the most popular and flexible approach would be a
> `GitDagFetcher` where all SHAs and refs become a possibility, as opposed
to
> say a TarballOnS3DagFetcher which would require more manual artifact
> management and versioning, which represent additional [human] workflow on
> top of the already existing git-based workflow.
>
> One way I've seen this done before is by using this Git fuse (file system
> in user space) hack that creates a virtual filesystem where all SHAs and
> refs in the Git repo are exposed as a subfolder, and under each ref
> subfolder the whole repo sits as of that ref. Of course all the files are
> virtual and fetched at access time by the virtual filesystem using the git
> api. So if you simply point the DagBag loader to the right [virtual]
> directory, it will import the right version of the DAG. In the git world,
> the alternative to that is managing temp folders and doing shallow clones
> which seems like much more of a headache. Note that one tradeoff is that
if
> git and whatever it depends has then a need to be highly available.
>
> Max
>
> On Wed, Feb 28, 2018 at 6:55 PM, David Capwell  wrote:
>
> > Thanks for all the details! With a pluggable fetcher we would be able to
> > add our own logic for how to fetch so sounds like a good place to start
> for
> > something like this!
> >
> > On Wed, Feb 28, 2018, 4:39 PM Joy Gao  wrote:
> >
> > > +1 on DagFetcher abstraction, very airflow-esque :)
> > >
> > > On Wed, Feb 28, 2018 at 11:25 AM, Maxime Beauchemin
> > >  wrote:
> > > > Addressing a few of your questions / concerns:
> > > >
> > > > * The scheduler uses a

Re: How to add hooks for strong deployment consistency?

2018-02-28 Thread David Capwell
t;> > One thing we've been talking about is the need for a "DagFetcher"
> >> > abstraction, where it's first implementation that would replace and
> mimic
> >> > the current one would be "FileSystemDagFetcher". One specific
> DagFetcher
> >> > implementation may or may not support version semantics, but if it
> does
> >> > should be able to receive a version id and return the proper version
> of
> >> the
> >> > DAG object. For instance that first "FileSystemDagFetcher" would not
> >> > support version semantic, but perhaps a "GitRepoDagFetcher" would, or
> an
> >> > "ArtifactoryDagFetcher", or "TarballInS3DagFetcher" may as well.
> >> >
> >> > Of course that assumes that the scheduler knows and stores the active
> >> > version number when generating a new DagRun, and for that information
> to
> >> be
> >> > leveraged on subsequent scheduler cycles and on workers when task are
> >> > executed.
> >> >
> >> > This could also enable things like "remote" backfills (non local,
> >> > parallelized) of a DAG definition that's on an arbitrary git ref
> >> (assuming
> >> > a "GitRepoDagFetcher").
> >> >
> >> > There are [perhaps] unintuitive implications where clearing a single
> task
> >> > would then re-run the old DAG definition on that task (since the
> version
> >> > was stamped in the DagRun and hasn't changed), but
> deleting/recreating a
> >> > DagRun would run the latest version (or any other version that may be
> >> > specified for that matter).
> >> >
> >> > I'm unclear on how much work that represents exactly, but it's
> certainly
> >> > doable and may only require to change part of the DagBag class and a
> few
> >> > other places.
> >> >
> >> > Max
> >> >
> >> > On Tue, Feb 27, 2018 at 6:48 PM, David Capwell 
> >> wrote:
> >> >
> >> > > Thanks for your feedback!
> >> > >
> >> > > Option 1 is a non-starter for us. The reason is we have DAGs that
> take
> >> 9+
> >> > > hours to run.
> >> > >
> >> > > Option 2 is more where my mind was going, but it's rather large.
> How I
> >> > see
> >> > > it you need a MVCC DagBag that's aware of multiple versions (what
> >> > provides
> >> > > version?).  Assuming you can track active dag runs pointing to which
> >> > > versions you know how to cleanup (fine with external).  The pro
> here is
> >> > you
> >> > > have snapshot isolation for dag_run, con is more bookkeeping and
> >> require
> >> > > deploy to work with this (last part may be a good thing though).
> >> > >
> >> > > The only other option I can think of is to lock deploy so the system
> >> only
> >> > > picks up new versions when no dag_run holds the lock.  This is
> flawed
> >> for
> >> > > many reasons, but breaks horrible for dag_runs that takes minutes (I
> >> > assume
> >> > > 99% do).
> >> > >
> >> > >
> >> > >
> >> > > On Tue, Feb 27, 2018, 4:50 PM Joy Gao  wrote:
> >> > >
> >> > > > Hi David!
> >> > > >
> >> > > > Thank you for clarifying, I think I understand your concern now.
> We
> >> > > > currently also work around this by making sure a dag is turned off
> >> > > > when we deploy a new version. We also make sure our jobs are
> >> > > > idempotent and retry-enabled in the case when we forget to turn
> off
> >> > > > the job, so the issue hasn't caused us too much headache.
> >> > > >
> >> > > > I do agree that it would be nice for Airflow to have the option to
> >> > > > guarantee a single version of dag per dag run. I see two
> approaches:
> >> > > >
> >> > > > (1) If a dag is updated, the current dagrun fails and/or retries.
> >> > > > (2) If a dag is updated, the current dagrun continues but uses
> >> version
> >> > > > before the update.
> >> > > >
> >> > > > (1) requires some mechanism to compare dag generations. One
> optio

Re: How to add hooks for strong deployment consistency?

2018-02-27 Thread David Capwell
Thanks for your feedback!

Option 1 is a non-starter for us. The reason is we have DAGs that take 9+
hours to run.

Option 2 is more where my mind was going, but it's rather large.  How I see
it you need a MVCC DagBag that's aware of multiple versions (what provides
version?).  Assuming you can track active dag runs pointing to which
versions you know how to cleanup (fine with external).  The pro here is you
have snapshot isolation for dag_run, con is more bookkeeping and require
deploy to work with this (last part may be a good thing though).

The only other option I can think of is to lock deploy so the system only
picks up new versions when no dag_run holds the lock.  This is flawed for
many reasons, but breaks horrible for dag_runs that takes minutes (I assume
99% do).



On Tue, Feb 27, 2018, 4:50 PM Joy Gao  wrote:

> Hi David!
>
> Thank you for clarifying, I think I understand your concern now. We
> currently also work around this by making sure a dag is turned off
> when we deploy a new version. We also make sure our jobs are
> idempotent and retry-enabled in the case when we forget to turn off
> the job, so the issue hasn't caused us too much headache.
>
> I do agree that it would be nice for Airflow to have the option to
> guarantee a single version of dag per dag run. I see two approaches:
>
> (1) If a dag is updated, the current dagrun fails and/or retries.
> (2) If a dag is updated, the current dagrun continues but uses version
> before the update.
>
> (1) requires some mechanism to compare dag generations. One option is
> to hash the dagfile and storing that value to the dagrun table, and
> compare against it each time a task is running. And in the case if the
> hash value is different, update the hash value, then fail/retry the
> dag. I think this is a fairly safe approach.
>
> (2) is trickier. A dag only has a property "fileloc" which tracks the
> location of the dag file, but the actual content of the dag file is
> never versioned. When a task instance starts running, it dynamically
> re-processes the dag file specified by the fileloc, generate all the
> task objects from the dag file, and fetch the task object by task_id
> in order to execute it. So in order to guarantee each dagrun to run a
> specific version, previous versions must be maintained on disk somehow
> (maintaining this information in memory is difficult, since if the
> scheduler/worker shuts down, that information is lost). This makes it
> a pretty big change, and I haven't thought much on how to implement
> it.
>
> I'm personally leaning towards (1) for sake of simplicity. Note that
> some users may not want dag to fail/retry even when dag is updated, so
> this should be an optional feature, not required.
>
> My scheduler-foo isn't that great, so curious what others have to say
> about this.
>
> On Fri, Feb 23, 2018 at 3:12 PM, David Capwell  wrote:
> > Thanks for the reply Joy, let me walk you though things as they are today
> >
> > 1) we don't stop airflow or disable DAGs while deploying updates to
> logic,
> > this is done live once its released
> > 2) the python script in the DAG folder doesn't actually have DAGs in it
> but
> > is a shim layer to allow us to deploy in a atomic way for a single host
> >   2.1) this script reads a file on local disk (less than disk page size)
> to
> > find latest git commit deployed
> >   2.2) re-does the airflow DAG load process but pointing to the git
> commit
> > path
> >
> > Example directory structure
> >
> > /airflow/dags/shim.py
> > /airflow/real_dags/
> > /latest # pointer to latest commit
> > /[git commit]/
> >
> > This is how we make sure deploys are consistent within a single task.
> >
> >
> > Now, lets assume we have a fully atomic commit process and are able to
> > upgrade DAGs at the exact same moment.
> >
> > At time T0 the scheduler knows of DAG V1 and schedules two tasks, Task1,
> > and Task2
> > At time T1 Task1 is picked up by Worker1, so starts executing the task
> (V1
> > logic)
> > At time T2 deploy commit happens, current DAG version: V2
> > At time T3, Task2 is picked up by Worker2, so starts executing the task
> (V2
> > logic)
> >
> > In many cases this isn't really a problem (tuning config change to hadoop
> > job), but as we have more people using Airflow this is causing a lot of
> > time spent debugging why production acted differently than expected (the
> > problem was already fixed... why is it still here?).  We also see that
> some
> > tasks expect a given behavior from other tasks,

Re: How to add hooks for strong deployment consistency?

2018-02-23 Thread David Capwell
Thanks for the reply Joy, let me walk you though things as they are today

1) we don't stop airflow or disable DAGs while deploying updates to logic,
this is done live once its released
2) the python script in the DAG folder doesn't actually have DAGs in it but
is a shim layer to allow us to deploy in a atomic way for a single host
  2.1) this script reads a file on local disk (less than disk page size) to
find latest git commit deployed
  2.2) re-does the airflow DAG load process but pointing to the git commit
path

Example directory structure

/airflow/dags/shim.py
/airflow/real_dags/
/latest # pointer to latest commit
/[git commit]/

This is how we make sure deploys are consistent within a single task.


Now, lets assume we have a fully atomic commit process and are able to
upgrade DAGs at the exact same moment.

At time T0 the scheduler knows of DAG V1 and schedules two tasks, Task1,
and Task2
At time T1 Task1 is picked up by Worker1, so starts executing the task (V1
logic)
At time T2 deploy commit happens, current DAG version: V2
At time T3, Task2 is picked up by Worker2, so starts executing the task (V2
logic)

In many cases this isn't really a problem (tuning config change to hadoop
job), but as we have more people using Airflow this is causing a lot of
time spent debugging why production acted differently than expected (the
problem was already fixed... why is it still here?).  We also see that some
tasks expect a given behavior from other tasks, and since they live in the
same git repo they can modify both tasks at the same time if a breaking
change is needed, but when this rolls out to prod there isn't a way to do
this other than turn off the DAG, and login to all hosts to verify fully
deployed.

We would like to remove this confusion and make generations/versions (same
thing really) exposed to users and make sure for a single dag_run only one
version is used.

I hope this is more clear.

On Fri, Feb 23, 2018 at 1:37 PM, Joy Gao  wrote:

> Hi David,
>
> Do you mind providing a concrete example of the scenario in which
> scheduler/workers see different states (I'm not 100% sure if I understood
> the issue at hand).
>
> And by same dag generation, are you referring to the dag version? (DAG
> version is currently not supported at all, but I can see it being a
> building block for future use cases).
>
> Joy
>
> On Fri, Feb 23, 2018 at 1:00 PM, David Capwell  wrote:
>
> > My current thinking is to add a field to the dag table that is optional
> and
> > provided by the dag. We currently intercept the load path do could use
> this
> > field to make sure we load the same generation.  My concern here is the
> > interaction with the scheduler, not as familiar with that logic to
> predict
> > corner cases were this would fail.
> >
> > Any other recommendations for how this could be done?
> >
> > On Mon, Feb 19, 2018, 10:33 PM David Capwell  wrote:
> >
> > > We have been using airflow for logic that delegates to other systems so
> > > inject a task all tasks depends to make sure all resources used are the
> > > same for all tasks in the dag. This works well for tasks that delegates
> > to
> > > external systems but people are starting to need to run logic in
> airflow
> > > and the fact that scheduler and all workers can see different states is
> > > causing issues
> > >
> > > We can make sure that all the code is deployed in a consistent way but
> > > need help from the scheduler to tell the workers the current generation
> > for
> > > a DAG.
> > >
> > > My question is, what would be the best way to modify airflow to allow
> > DAGs
> > > to define a generation value that the scheduler could send to workers?
> > >
> > > Thanks
> > >
> >
>


Re: How to add hooks for strong deployment consistency?

2018-02-23 Thread David Capwell
My current thinking is to add a field to the dag table that is optional and
provided by the dag. We currently intercept the load path do could use this
field to make sure we load the same generation.  My concern here is the
interaction with the scheduler, not as familiar with that logic to predict
corner cases were this would fail.

Any other recommendations for how this could be done?

On Mon, Feb 19, 2018, 10:33 PM David Capwell  wrote:

> We have been using airflow for logic that delegates to other systems so
> inject a task all tasks depends to make sure all resources used are the
> same for all tasks in the dag. This works well for tasks that delegates to
> external systems but people are starting to need to run logic in airflow
> and the fact that scheduler and all workers can see different states is
> causing issues
>
> We can make sure that all the code is deployed in a consistent way but
> need help from the scheduler to tell the workers the current generation for
> a DAG.
>
> My question is, what would be the best way to modify airflow to allow DAGs
> to define a generation value that the scheduler could send to workers?
>
> Thanks
>


How to add hooks for strong deployment consistency?

2018-02-19 Thread David Capwell
We have been using airflow for logic that delegates to other systems so
inject a task all tasks depends to make sure all resources used are the
same for all tasks in the dag. This works well for tasks that delegates to
external systems but people are starting to need to run logic in airflow
and the fact that scheduler and all workers can see different states is
causing issues

We can make sure that all the code is deployed in a consistent way but need
help from the scheduler to tell the workers the current generation for a
DAG.

My question is, what would be the best way to modify airflow to allow DAGs
to define a generation value that the scheduler could send to workers?

Thanks


Re: Rerunning task without cleaning DB?

2018-02-07 Thread David Capwell
Ananth, I am not familiar with that and couldn't find any reference in the
code, can you say more?

On Feb 7, 2018 3:02 PM, "Trent Robbins"  wrote:

> If you want to keep the rest of your history you can:
>
> 1. turn the DAG off
> 2. delete its bad tasks, delete the bad DAG run
> 3. turn the DAG on
> 4. let it backfill or hit the play button manually depending on your needs
>
> Unfortunately this does not keep the task you are working with, but it's
> better than dropping the database by far.
>
>
>
>
>
> Best,
>
> Trent Robbins
> Strategic Consultant for Open Source Software
> Tau Informatics LLC
> desk: 415-404-9452
> cell: 513-233-5651
> tr...@tauinformatics.com
> https://www.linkedin.com/in/trentrobbins
>
> On Wed, Feb 7, 2018 at 2:57 PM, Ananth Durai  wrote:
>
> > We can't do that, unfortunately. Airflow schedule the task based on the
> > current state in the DB. If you would like to preserve the history one
> > option would be to add instrumentation on airflow_local_settings.py
> >
> > Regards,
> > Ananth.P,
> >
> >
> >
> >
> >
> >
> > On 5 February 2018 at 13:09, David Capwell  wrote:
> >
> > > When a production issue happens it's common that we clear the history
> to
> > > get airflow to run the task again.  This is problematic since it throws
> > > away the history making finding out what real happened harder.
> > >
> > > Is there any way to rerun a task without deleting from the DB?
> > >
> >
>


Rerunning task without cleaning DB?

2018-02-05 Thread David Capwell
When a production issue happens it's common that we clear the history to
get airflow to run the task again.  This is problematic since it throws
away the history making finding out what real happened harder.

Is there any way to rerun a task without deleting from the DB?


Re: Automatic DAGs deployment

2017-11-07 Thread David Capwell
@devjyoti is that not the existing behavior anyways? The scheduler only
knows about the top level Python script that links this together, and that
script is expected to emit all DAGs or they get dropped.

If this is taxing for the scheduler I don't notice, the box it runs on is
mostly idle.

On Nov 7, 2017 6:22 PM, "Devjyoti Patra"  wrote:

@david, in your approach, doesn't it become taxing for the scheduler to
parse the source files of entire project to create the Dag? If the
scheduler is invoked every 30 secs, it will have to read the project every
time.

 I like the approach and would like to use it of this is not a concern.

On Nov 7, 2017 9:52 PM, "David Capwell"  wrote:

> For us we use git commits to solve this for single node (don't have
> distributed consistency), single task case (two tasks on same node may see
> different state).
>
> What we do is we install the whole code in the dag dir as the following
>
> DAG_DIR//.
>
> There is a metadata file we update when we deploy (atomic write, no
partial
> reads) that points to latest commit. In the DAG_DIR we have a Python
script
> that knows about this structure, reads the meta file, and loads the
commit.
>
> We also inject a "deploy" SubDag into every DAG that makes sure that all
> the resources (our concept, stuff from things like artifactory) needed are
> used for the life of the execution (including rerunning at a later date).
> Only bring you up since we have thought to do the same trick to solve
multi
> node case, but would need something like a two phase commit to make sure
> all nodes have the code else it will fail.
>
> On Nov 7, 2017 6:30 AM, "Grant Nicholas" <
> grantnicholas2...@u.northwestern.edu> wrote:
>
> > +1
> >
> > Long term would be awesome if airflow supported upgrades of in flight
> dags
> > with a hashing/versioning setup.
> >
> > But as a first step, would be good to document how we want people to
> > upgrade dags. (Or at least a disclaimer talking about the pitfalls).
> >
> >
> > On Nov 6, 2017 3:08 PM, "Daniel Imberman" 
> > wrote:
> >
> > > +1 for this conversation.
> > >
> > > I know that most of the production airflow instances basically just
> have
> > a
> > > policy of "don't update the DAG files while a job is running."
> > >
> > > One thing that is difficult with this, however, is that for
> > CeleryExecutor
> > > and KubernetesExecutor we don't really have any power over the DAG
> > > refreshes. If you're storing your DAGs in s3 or NFS, we can't stop or
> > > trigger a refresh of the DAGs. I'd be interested to see what others
> have
> > > done for this and if there's anything we can do to standardize this.
> > >
> > > On Mon, Nov 6, 2017 at 12:34 PM Gaetan Semet 
> wrote:
> > >
> > > > Hello
> > > >
> > > > I am working with Airflow to see how we can use it in my company,
> and I
> > > > volunteer to help you if you need help on some parts. I used to work
> a
> > > lot
> > > > with Python and Twisted, but real, distributed scheduling is kind of
> a
> > > new
> > > > sport for me.
> > > >
> > > > I see that deploying DAGs regularly is not as easy as we can
> imagine. I
> > > > started playing with git-sync and apparently it is not recommended
in
> > > prod
> > > > since it can lead to an incoherent state if the scheduler is
> refreshed
> > in
> > > > the middle of the execution. But DAGs lives and they can be updated
> by
> > > > users and I think Airflow needs a way to allow automatic refresh of
> the
> > > > DAGs without having to stop the scheduler.
> > > >
> > > > Does anyone already works on it, or do you have a set of JIRA ticket
> > > > covering this issue so I can start working on it ?
> > > >
> > > > Best Regards,
> > > > Gaetan Semet
> > > >
> > >
> >
>


Re: Automatic DAGs deployment

2017-11-07 Thread David Capwell
For us we use git commits to solve this for single node (don't have
distributed consistency), single task case (two tasks on same node may see
different state).

What we do is we install the whole code in the dag dir as the following

DAG_DIR//.

There is a metadata file we update when we deploy (atomic write, no partial
reads) that points to latest commit. In the DAG_DIR we have a Python script
that knows about this structure, reads the meta file, and loads the commit.

We also inject a "deploy" SubDag into every DAG that makes sure that all
the resources (our concept, stuff from things like artifactory) needed are
used for the life of the execution (including rerunning at a later date).
Only bring you up since we have thought to do the same trick to solve multi
node case, but would need something like a two phase commit to make sure
all nodes have the code else it will fail.

On Nov 7, 2017 6:30 AM, "Grant Nicholas" <
grantnicholas2...@u.northwestern.edu> wrote:

> +1
>
> Long term would be awesome if airflow supported upgrades of in flight dags
> with a hashing/versioning setup.
>
> But as a first step, would be good to document how we want people to
> upgrade dags. (Or at least a disclaimer talking about the pitfalls).
>
>
> On Nov 6, 2017 3:08 PM, "Daniel Imberman" 
> wrote:
>
> > +1 for this conversation.
> >
> > I know that most of the production airflow instances basically just have
> a
> > policy of "don't update the DAG files while a job is running."
> >
> > One thing that is difficult with this, however, is that for
> CeleryExecutor
> > and KubernetesExecutor we don't really have any power over the DAG
> > refreshes. If you're storing your DAGs in s3 or NFS, we can't stop or
> > trigger a refresh of the DAGs. I'd be interested to see what others have
> > done for this and if there's anything we can do to standardize this.
> >
> > On Mon, Nov 6, 2017 at 12:34 PM Gaetan Semet  wrote:
> >
> > > Hello
> > >
> > > I am working with Airflow to see how we can use it in my company, and I
> > > volunteer to help you if you need help on some parts. I used to work a
> > lot
> > > with Python and Twisted, but real, distributed scheduling is kind of a
> > new
> > > sport for me.
> > >
> > > I see that deploying DAGs regularly is not as easy as we can imagine. I
> > > started playing with git-sync and apparently it is not recommended in
> > prod
> > > since it can lead to an incoherent state if the scheduler is refreshed
> in
> > > the middle of the execution. But DAGs lives and they can be updated by
> > > users and I think Airflow needs a way to allow automatic refresh of the
> > > DAGs without having to stop the scheduler.
> > >
> > > Does anyone already works on it, or do you have a set of JIRA ticket
> > > covering this issue so I can start working on it ?
> > >
> > > Best Regards,
> > > Gaetan Semet
> > >
> >
>


Re: Airflow stops reading stdout of forked process with BashOperator

2017-10-06 Thread David Capwell
Python version is 2.7.6

On Oct 4, 2017 9:52 AM, "Driesprong, Fokko"  wrote:

> Hi David,
>
> Thank you for the question. The problem for the Spark-sql hook seems
> related
> <https://issues.apache.org/jira/browse/AIRFLOW-1647>, but the issue is
> different. At the spark-sql hook, the problem was that there where two
> iterators, one for the stdout, and one for the stderr. First the one of the
> stdout was being read, and after hitting an eof, the stderr iterator would
> be emptied. This caused the stderr to grow quickly (since the stdout was
> being read first). This was fixed by redirecting the stderr to stdout and
> use a single iterator. This is already the case in the BashOperator.
>
> The iterator should be ready by Airflow, so the stdout buffer should be
> read. What version of Python are you using?
>
> Cheers, Fokko
>
>
>
>
>
> 2017-10-03 7:55 GMT+02:00 Bolke de Bruin :
>
> > Probably a buffer is full or not emptied in time (as you mentioned). Ie.
> > If we’re reading from stderr but the stdout is full it gets stuck. This
> was
> > fixed for the SparkOperators but we might need to do the same here.
> >
> > Bolke
> >
> > Verstuurd vanaf mijn iPad
> >
> > > Op 3 okt. 2017 om 03:02 heeft David Capwell  het
> > volgende geschreven:
> > >
> > > We use the bash operator to call a Java command line. We notice that
> some
> > > times the task stays running a long time (never stops) and that the
> logs
> > in
> > > airflow stop getting updated for the task. After debugging a bit it
> turns
> > > out that the jvm is blocked on the stdout FD since the buffer is full.
> I
> > > manually cleaned the buffer (just called cat to dump the buffer) and
> see
> > > the jvm halts cleanly but the task stays stuck in airflow; airflow run
> is
> > > still running but the forked process is not
> > >
> > > Walking the code in bash_operator I see that airflow creates a shell
> > script
> > > than has bash run it. I see in the logs the location of the script but
> I
> > > don't see it on the file system. I didn't check when the process was
> hung
> > > so dont know if bash was running or not.
> > >
> > > We have seen this a few times. Any idea what's going on? New to
> debugging
> > > Python and ptrace is disabled in our env so can't find a way to get the
> > > state of the airflow run command.
> > >
> > > Thanks for any help!
> > >
> > > Airflow version: 1.8.0 and 1.8.2 (above was on 1.8.2 but we see this on
> > > 1.8.0 cluster as well)
> >
>


Airflow stops reading stdout of forked process with BashOperator

2017-10-02 Thread David Capwell
We use the bash operator to call a Java command line. We notice that some
times the task stays running a long time (never stops) and that the logs in
airflow stop getting updated for the task. After debugging a bit it turns
out that the jvm is blocked on the stdout FD since the buffer is full. I
manually cleaned the buffer (just called cat to dump the buffer) and see
the jvm halts cleanly but the task stays stuck in airflow; airflow run is
still running but the forked process is not

Walking the code in bash_operator I see that airflow creates a shell script
than has bash run it. I see in the logs the location of the script but I
don't see it on the file system. I didn't check when the process was hung
so dont know if bash was running or not.

We have seen this a few times. Any idea what's going on? New to debugging
Python and ptrace is disabled in our env so can't find a way to get the
state of the airflow run command.

Thanks for any help!

Airflow version: 1.8.0 and 1.8.2 (above was on 1.8.2 but we see this on
1.8.0 cluster as well)


Re: Upgrading to 1.8.2 fails to load variables page

2017-09-07 Thread David Capwell
So the JIRA linked isn't really related since that only gets trigged on
post and not get.

I locally patched to do a None check but was wondering what changed that
caused this behavior? I agree None key makes no sense but if one does exist
the UI doesn't load which causes the user to go to the DB directly to
cleanup

Thanks for your time!

On Sep 7, 2017 9:21 AM, "David Capwell"  wrote:

> Going into a python repl I see the following when I list the DB state
>
> yaml.repo.update.frequency : [encrypted data]
> foo : [encrypted data]
> None : [encrypted data]
>
> On Thu, Sep 7, 2017 at 9:16 AM, David Capwell  wrote:
>
>> I just deployed 1.8.2 to a test cluster that was running 1.8.0 and the
>> below stacktrace is all I get when I try to view the variables page
>>
>> Looking at it and searching JIRA I found https://issues.apache.or
>> g/jira/browse/AIRFLOW-1200 which looks like its trying to block things
>> from being created, but doesn't seem to handle when the DB has something it
>> doesn't like.
>>
>> Traceback (most recent call last):
>>   File "/lib/python2.7/site-packages/flask/app.py", line 1988, in
>> wsgi_app
>> response = self.full_dispatch_request()
>>   File "/lib/python2.7/site-packages/flask/app.py", line 1641, in
>> full_dispatch_request
>> rv = self.handle_user_exception(e)
>>   File "/lib/python2.7/site-packages/flask/app.py", line 1544, in
>> handle_user_exception
>> reraise(exc_type, exc_value, tb)
>>   File "/lib/python2.7/site-packages/flask/app.py", line 1639, in
>> full_dispatch_request
>> rv = self.dispatch_request()
>>   File "/lib/python2.7/site-packages/flask/app.py", line 1625, in
>> dispatch_request
>> return self.view_functions[rule.endpoint](**req.view_args)
>>   File "/lib/python2.7/site-packages/flask_admin/base.py", line 69, in
>> inner
>> return self._run_view(f, *args, **kwargs)
>>   File "/lib/python2.7/site-packages/flask_admin/base.py", line 368, in
>> _run_view
>> return fn(self, *args, **kwargs)
>>   File "/lib/python2.7/site-packages/flask_admin/model/base.py", line
>> 1900, in index_view
>> return_url=self._get_list_url(view_args),
>>   File "/lib/python2.7/site-packages/flask_admin/base.py", line 308, in
>> render
>> return render_template(template, **kwargs)
>>   File "/lib/python2.7/site-packages/flask/templating.py", line 134, in
>> render_template
>> context, ctx.app)
>>   File "/lib/python2.7/site-packages/flask/templating.py", line 116, in
>> _render
>> rv = template.render(context)
>>   File "/lib/python2.7/site-packages/jinja2/environment.py", line 989,
>> in render
>> return self.environment.handle_exception(exc_info, True)
>>   File "/lib/python2.7/site-packages/jinja2/environment.py", line 754,
>> in handle_exception
>> reraise(exc_type, exc_value, tb)
>>   File 
>> "/lib/python2.7/site-packages/airflow/www/templates/airflow/variable_list.html",
>> line 18, in top-level template code
>> {% extends 'admin/model/list.html' %}
>>   File 
>> "/lib/python2.7/site-packages/flask_admin/templates/bootstrap3/admin/model/list.html",
>> line 6, in top-level template code
>> {% import 'admin/model/row_actions.html' as row_actions with context
>> %}
>>   File 
>> "/lib/python2.7/site-packages/airflow/www/templates/admin/master.html",
>> line 18, in top-level template code
>> {% extends 'admin/base.html' %}
>>   File 
>> "/lib/python2.7/site-packages/flask_admin/templates/bootstrap3/admin/base.html",
>> line 30, in top-level template code
>> {% block page_body %}
>>   File 
>> "/lib/python2.7/site-packages/airflow/www/templates/admin/master.html",
>> line 96, in block "page_body"
>> {% block body %}
>>   File 
>> "/lib/python2.7/site-packages/flask_admin/templates/bootstrap3/admin/model/list.html",
>> line 62, in block "body"
>> {% block model_list_table %}
>>   File 
>> "/lib/python2.7/site-packages/flask_admin/templates/bootstrap3/admin/model/list.html",
>> line 110, in block "model_list_table"
>> {% block list_row scoped %}
>>   File 
>> "/lib/python2.7/site-packages/flask_admin/templates/bootstrap3/admin/model/list.html",
>> line 138, in block "list_row"
>> {{ get_value(row, 

Re: Upgrading to 1.8.2 fails to load variables page

2017-09-07 Thread David Capwell
Going into a python repl I see the following when I list the DB state

yaml.repo.update.frequency : [encrypted data]
foo : [encrypted data]
None : [encrypted data]

On Thu, Sep 7, 2017 at 9:16 AM, David Capwell  wrote:

> I just deployed 1.8.2 to a test cluster that was running 1.8.0 and the
> below stacktrace is all I get when I try to view the variables page
>
> Looking at it and searching JIRA I found https://issues.apache.
> org/jira/browse/AIRFLOW-1200 which looks like its trying to block things
> from being created, but doesn't seem to handle when the DB has something it
> doesn't like.
>
> Traceback (most recent call last):
>   File "/lib/python2.7/site-packages/flask/app.py", line 1988, in wsgi_app
> response = self.full_dispatch_request()
>   File "/lib/python2.7/site-packages/flask/app.py", line 1641, in
> full_dispatch_request
> rv = self.handle_user_exception(e)
>   File "/lib/python2.7/site-packages/flask/app.py", line 1544, in
> handle_user_exception
> reraise(exc_type, exc_value, tb)
>   File "/lib/python2.7/site-packages/flask/app.py", line 1639, in
> full_dispatch_request
> rv = self.dispatch_request()
>   File "/lib/python2.7/site-packages/flask/app.py", line 1625, in
> dispatch_request
> return self.view_functions[rule.endpoint](**req.view_args)
>   File "/lib/python2.7/site-packages/flask_admin/base.py", line 69, in
> inner
> return self._run_view(f, *args, **kwargs)
>   File "/lib/python2.7/site-packages/flask_admin/base.py", line 368, in
> _run_view
> return fn(self, *args, **kwargs)
>   File "/lib/python2.7/site-packages/flask_admin/model/base.py", line
> 1900, in index_view
> return_url=self._get_list_url(view_args),
>   File "/lib/python2.7/site-packages/flask_admin/base.py", line 308, in
> render
> return render_template(template, **kwargs)
>   File "/lib/python2.7/site-packages/flask/templating.py", line 134, in
> render_template
> context, ctx.app)
>   File "/lib/python2.7/site-packages/flask/templating.py", line 116, in
> _render
> rv = template.render(context)
>   File "/lib/python2.7/site-packages/jinja2/environment.py", line 989, in
> render
> return self.environment.handle_exception(exc_info, True)
>   File "/lib/python2.7/site-packages/jinja2/environment.py", line 754, in
> handle_exception
> reraise(exc_type, exc_value, tb)
>   File 
> "/lib/python2.7/site-packages/airflow/www/templates/airflow/variable_list.html",
> line 18, in top-level template code
> {% extends 'admin/model/list.html' %}
>   File "/lib/python2.7/site-packages/flask_admin/templates/
> bootstrap3/admin/model/list.html", line 6, in top-level template code
> {% import 'admin/model/row_actions.html' as row_actions with context %}
>   File "/lib/python2.7/site-packages/airflow/www/templates/admin/master.html",
> line 18, in top-level template code
> {% extends 'admin/base.html' %}
>   File 
> "/lib/python2.7/site-packages/flask_admin/templates/bootstrap3/admin/base.html",
> line 30, in top-level template code
> {% block page_body %}
>   File "/lib/python2.7/site-packages/airflow/www/templates/admin/master.html",
> line 96, in block "page_body"
> {% block body %}
>   File "/lib/python2.7/site-packages/flask_admin/templates/
> bootstrap3/admin/model/list.html", line 62, in block "body"
> {% block model_list_table %}
>   File "/lib/python2.7/site-packages/flask_admin/templates/
> bootstrap3/admin/model/list.html", line 110, in block "model_list_table"
> {% block list_row scoped %}
>   File "/lib/python2.7/site-packages/flask_admin/templates/
> bootstrap3/admin/model/list.html", line 138, in block "list_row"
> {{ get_value(row, c) }}
>   File "/lib/python2.7/site-packages/flask_admin/model/base.py", line
> 1742, in get_list_value
> self.column_type_formatters,
>   File "/lib/python2.7/site-packages/flask_admin/model/base.py", line
> 1707, in _get_list_value
> value = column_fmt(self, context, model, name)
>   File "/lib/python2.7/site-packages/airflow/www/views.py", line 2122, in
> hidden_field_formatter
> if should_hide_value_for_key(model.key):
>   File "/lib/python2.7/site-packages/airflow/www/views.py", line 278, in
> should_hide_value_for_key
> return any(s in key_name for s in DEFAULT_SENSITIVE_VARIABLE_FIELDS) \
>   File "/lib/python2.7/site-packages/airflow/www/views.py", line 278, in
> 
> return any(s in key_name for s in DEFAULT_SENSITIVE_VARIABLE_FIELDS) \
> TypeError: argument of type 'NoneType' is not iterable
>
>


Re: Upgrading to 1.8.2 fails to display variable page

2017-09-07 Thread David Capwell
sorry, this email was my hitting send early.  I sent another email with the
details.

On Thu, Sep 7, 2017 at 9:13 AM, Ash Berlin-Taylor <
ash_airflowl...@firemirror.com> wrote:

> Was it included as an image? If so it was stripped (and would be more
> useful included as text anyway)
>
> -ash
>
> > On 7 Sep 2017, at 17:12, David Capwell  wrote:
> >
> > I just upgraded a test environment from 1.8.0 to 1.8.2 and notice that
> the
> > variables page is no longer able to load.
> >
> > The stacktrace is defined below
>
>


Upgrading to 1.8.2 fails to load variables page

2017-09-07 Thread David Capwell
I just deployed 1.8.2 to a test cluster that was running 1.8.0 and the
below stacktrace is all I get when I try to view the variables page

Looking at it and searching JIRA I found
https://issues.apache.org/jira/browse/AIRFLOW-1200 which looks like its
trying to block things from being created, but doesn't seem to handle when
the DB has something it doesn't like.

Traceback (most recent call last):
  File "/lib/python2.7/site-packages/flask/app.py", line 1988, in wsgi_app
response = self.full_dispatch_request()
  File "/lib/python2.7/site-packages/flask/app.py", line 1641, in
full_dispatch_request
rv = self.handle_user_exception(e)
  File "/lib/python2.7/site-packages/flask/app.py", line 1544, in
handle_user_exception
reraise(exc_type, exc_value, tb)
  File "/lib/python2.7/site-packages/flask/app.py", line 1639, in
full_dispatch_request
rv = self.dispatch_request()
  File "/lib/python2.7/site-packages/flask/app.py", line 1625, in
dispatch_request
return self.view_functions[rule.endpoint](**req.view_args)
  File "/lib/python2.7/site-packages/flask_admin/base.py", line 69, in inner
return self._run_view(f, *args, **kwargs)
  File "/lib/python2.7/site-packages/flask_admin/base.py", line 368, in
_run_view
return fn(self, *args, **kwargs)
  File "/lib/python2.7/site-packages/flask_admin/model/base.py", line 1900,
in index_view
return_url=self._get_list_url(view_args),
  File "/lib/python2.7/site-packages/flask_admin/base.py", line 308, in
render
return render_template(template, **kwargs)
  File "/lib/python2.7/site-packages/flask/templating.py", line 134, in
render_template
context, ctx.app)
  File "/lib/python2.7/site-packages/flask/templating.py", line 116, in
_render
rv = template.render(context)
  File "/lib/python2.7/site-packages/jinja2/environment.py", line 989, in
render
return self.environment.handle_exception(exc_info, True)
  File "/lib/python2.7/site-packages/jinja2/environment.py", line 754, in
handle_exception
reraise(exc_type, exc_value, tb)
  File
"/lib/python2.7/site-packages/airflow/www/templates/airflow/variable_list.html",
line 18, in top-level template code
{% extends 'admin/model/list.html' %}
  File
"/lib/python2.7/site-packages/flask_admin/templates/bootstrap3/admin/model/list.html",
line 6, in top-level template code
{% import 'admin/model/row_actions.html' as row_actions with context %}
  File
"/lib/python2.7/site-packages/airflow/www/templates/admin/master.html",
line 18, in top-level template code
{% extends 'admin/base.html' %}
  File
"/lib/python2.7/site-packages/flask_admin/templates/bootstrap3/admin/base.html",
line 30, in top-level template code
{% block page_body %}
  File
"/lib/python2.7/site-packages/airflow/www/templates/admin/master.html",
line 96, in block "page_body"
{% block body %}
  File
"/lib/python2.7/site-packages/flask_admin/templates/bootstrap3/admin/model/list.html",
line 62, in block "body"
{% block model_list_table %}
  File
"/lib/python2.7/site-packages/flask_admin/templates/bootstrap3/admin/model/list.html",
line 110, in block "model_list_table"
{% block list_row scoped %}
  File
"/lib/python2.7/site-packages/flask_admin/templates/bootstrap3/admin/model/list.html",
line 138, in block "list_row"
{{ get_value(row, c) }}
  File "/lib/python2.7/site-packages/flask_admin/model/base.py", line 1742,
in get_list_value
self.column_type_formatters,
  File "/lib/python2.7/site-packages/flask_admin/model/base.py", line 1707,
in _get_list_value
value = column_fmt(self, context, model, name)
  File "/lib/python2.7/site-packages/airflow/www/views.py", line 2122, in
hidden_field_formatter
if should_hide_value_for_key(model.key):
  File "/lib/python2.7/site-packages/airflow/www/views.py", line 278, in
should_hide_value_for_key
return any(s in key_name for s in DEFAULT_SENSITIVE_VARIABLE_FIELDS) \
  File "/lib/python2.7/site-packages/airflow/www/views.py", line 278, in

return any(s in key_name for s in DEFAULT_SENSITIVE_VARIABLE_FIELDS) \
TypeError: argument of type 'NoneType' is not iterable


Upgrading to 1.8.2 fails to display variable page

2017-09-07 Thread David Capwell
I just upgraded a test environment from 1.8.0 to 1.8.2 and notice that the
variables page is no longer able to load.

The stacktrace is defined below


Re: As history grows UI gets slower

2017-08-29 Thread David Capwell
So if I cleanup the DB for anything older than 30 days, wouldn't the
scheduler try to backfill?

On Aug 29, 2017 11:02 AM, "David Capwell"  wrote:

> Thanks, will take a look at this project
>
> On Aug 29, 2017 10:35 AM, "Chris Riccomini"  wrote:
>
>> Might have a look at this, too:
>>
>> https://github.com/teamclairvoyant/airflow-maintenance-dags
>>
>> I haven't used it, but it seems to have a DB cleaning script.
>>
>> On Mon, Aug 28, 2017 at 3:43 PM, Maxime Beauchemin <
>> maximebeauche...@gmail.com> wrote:
>>
>> > Just make sure to archive based on start_date and not execution_date to
>> > allow backfills.
>> >
>> > Max
>> >
>> > On Mon, Aug 28, 2017 at 3:20 PM, Alex Guziel > > invalid
>> > > wrote:
>> >
>> > > Here at Airbnb we delete old "completed" task instances.
>> > >
>> > > On Mon, Aug 28, 2017 at 3:01 PM, David Capwell 
>> > wrote:
>> > >
>> > > > We are on 1.8.0 and have a monitor DAG that monitors the health of
>> > > Airflow
>> > > > and Celery every minute.  This has been running for awhile now and
>> at
>> > 26k
>> > > > dag runs. We see that the UI for this DAG is multiple seconds slower
>> > (6-7
>> > > > second) than any other DAG.
>> > > >
>> > > > My question is, what do people do about managing history as it grows
>> > over
>> > > > time? Do people delete history after N or so days?
>> > > >
>> > > > Thanks for your time reading this email
>> > > >
>> > >
>> >
>>
>


Re: As history grows UI gets slower

2017-08-29 Thread David Capwell
Thanks, will take a look at this project

On Aug 29, 2017 10:35 AM, "Chris Riccomini"  wrote:

> Might have a look at this, too:
>
> https://github.com/teamclairvoyant/airflow-maintenance-dags
>
> I haven't used it, but it seems to have a DB cleaning script.
>
> On Mon, Aug 28, 2017 at 3:43 PM, Maxime Beauchemin <
> maximebeauche...@gmail.com> wrote:
>
> > Just make sure to archive based on start_date and not execution_date to
> > allow backfills.
> >
> > Max
> >
> > On Mon, Aug 28, 2017 at 3:20 PM, Alex Guziel  > invalid
> > > wrote:
> >
> > > Here at Airbnb we delete old "completed" task instances.
> > >
> > > On Mon, Aug 28, 2017 at 3:01 PM, David Capwell 
> > wrote:
> > >
> > > > We are on 1.8.0 and have a monitor DAG that monitors the health of
> > > Airflow
> > > > and Celery every minute.  This has been running for awhile now and at
> > 26k
> > > > dag runs. We see that the UI for this DAG is multiple seconds slower
> > (6-7
> > > > second) than any other DAG.
> > > >
> > > > My question is, what do people do about managing history as it grows
> > over
> > > > time? Do people delete history after N or so days?
> > > >
> > > > Thanks for your time reading this email
> > > >
> > >
> >
>


As history grows UI gets slower

2017-08-28 Thread David Capwell
We are on 1.8.0 and have a monitor DAG that monitors the health of Airflow
and Celery every minute.  This has been running for awhile now and at 26k
dag runs. We see that the UI for this DAG is multiple seconds slower (6-7
second) than any other DAG.

My question is, what do people do about managing history as it grows over
time? Do people delete history after N or so days?

Thanks for your time reading this email


Re: Pools and extra capacity?

2017-08-17 Thread David Capwell
Thanks for the details; yeah was expecting this behavior

On Aug 17, 2017 12:36 PM, "Maxime Beauchemin" 
wrote:

> The point of pools is to limit parallelism on a logical set of tasks
> instances to a certain number. Overflowing into another pool would break
> the only guarantee it provides.
>
> `priority_weigth` works along with pool to define which task should be
> scheduled first once slots open up. It won't kill any other tasks if higher
> priority tasks show up, it just re-orders the queue.
>
> Max
>
> On Wed, Aug 16, 2017 at 10:47 PM, David Capwell 
> wrote:
>
> > I'm looking into pools and had a few questions
> >
> > Let's say I have two pools, each of 50% of the cluster.  If one pool is
> at
> > capacity and has a backlog, but the other pool is idle, will airflow
> allow
> > the first pool's work to start consuming the slots from the idle pool,
> and
> > if so is their preemption to rectify if needed? Since the docs don't
> cover
> > this I expect that this is not supported?
> >
> > Also, let's say I have a DAG that I need to always run regardless of if
> > there are free slot or not; how could I accomplish this? Best I can see
> is
> > pools and make one just slightly smaller than the full cluster.
> >
> > Thanks for your time reading this email
> >
>


Pools and extra capacity?

2017-08-16 Thread David Capwell
I'm looking into pools and had a few questions

Let's say I have two pools, each of 50% of the cluster.  If one pool is at
capacity and has a backlog, but the other pool is idle, will airflow allow
the first pool's work to start consuming the slots from the idle pool, and
if so is their preemption to rectify if needed? Since the docs don't cover
this I expect that this is not supported?

Also, let's say I have a DAG that I need to always run regardless of if
there are free slot or not; how could I accomplish this? Best I can see is
pools and make one just slightly smaller than the full cluster.

Thanks for your time reading this email


Re: Tasks stay queued when they fail in celery

2017-08-05 Thread David Capwell
Thanks for the details!

On Aug 4, 2017 10:21 AM, "George Leslie-Waksman"
 wrote:

> We've seen this before as well, it's a bug in the Celery Executor that has
> a bunch of different manifestations.
>
> There is at least one open issue relating to this bug:
> https://issues.apache.org/jira/browse/AIRFLOW-1463
>
> I have been working on a fix but it's likely to be a few more days before I
> have a chance to make some progress.
>
> --George
>
> On Fri, Jul 28, 2017 at 5:05 PM David Capwell  wrote:
>
> > We noticed that in the past few days we keep seeing tasks stay in the
> > queued state.  Looking into celery, we see that the task had failed.
> >
> > Traceback (most recent call last):
> >   File "/python/lib/python2.7/site-packages/celery/app/trace.py", line
> > 367, in trace_task
> > R = retval = fun(*args, **kwargs)
> >   File "/python/lib/python2.7/site-packages/celery/app/trace.py", line
> > 622, in __protected_call__
> > return self.run(*args, **kwargs)
> >   File
> > "/python/lib/python2.7/site-packages/airflow/executors/
> celery_executor.py",
> > line 59, in execute_command
> > raise AirflowException('Celery command failed')
> > AirflowException: Celery command failed
> >
> >
> > Why does airflow not learn about this and recover? And what can we do to
> > prevent this?
> >
> > Thanks for your time reading this email.
> >
>


Tasks stay queued when they fail in celery

2017-07-28 Thread David Capwell
We noticed that in the past few days we keep seeing tasks stay in the
queued state.  Looking into celery, we see that the task had failed.

Traceback (most recent call last):
  File "/python/lib/python2.7/site-packages/celery/app/trace.py", line
367, in trace_task
R = retval = fun(*args, **kwargs)
  File "/python/lib/python2.7/site-packages/celery/app/trace.py", line
622, in __protected_call__
return self.run(*args, **kwargs)
  File 
"/python/lib/python2.7/site-packages/airflow/executors/celery_executor.py",
line 59, in execute_command
raise AirflowException('Celery command failed')
AirflowException: Celery command failed


Why does airflow not learn about this and recover? And what can we do to
prevent this?

Thanks for your time reading this email.


Re: Hooks and connection

2017-06-28 Thread David Capwell
Thanks!  Yeah, using http type for now and things are working just fine

On Jun 28, 2017 12:17 PM, "Rutherford, James" <
james.rutherf...@maplecroft.com> wrote:

> Hi David,
>
> I’m also a bit of an Airflow novice but what I’ve figured out is that
> Connections are essentially parameters for Hooks. The connection type
> doesn't seem to be used for anything explicitly (the list of choices is
> hard-coded in the Airflow source).
>
> For example, I made a SharePoint hook that defines how content goes into
> and comes out of SharePoint. The connection information (URL, protocol,
> credentials, etc) is stored as a Connection with type set as 'Samba' (it
> was the closest I could find); the Hook just wraps the SharePoint API.
>
>
> I hope that helps.
>
> Cheers,
>
> Jim
>
>
>
> On 28/06/2017, 19:05, "David Capwell"  wrote:
>
> >I'm just starting out with airflow and looking to add my own artifactory
> >hook so my tasks can pull from there.
> >
> >Looking at the docs this means I need a ArtifactoryHook but not clear to
> me
> >how this integrates with connections.  Looking over the connection code
> the
> >mapping is hard coded but the plug-in docs say to just build a hook.  I
> >have a hello world hook but not sure how to link it with connections
> >(connection tab does not list it)
> >
> >Thanks for your time reading this email
>
> 
>
> This email is intended solely for the recipient. It may contain
> privileged, proprietary or confidential information or material. If you are
> not the intended recipient, please delete this email and any attachments
> and notify the sender of the error.
>


Hooks and connection

2017-06-28 Thread David Capwell
I'm just starting out with airflow and looking to add my own artifactory
hook so my tasks can pull from there.

Looking at the docs this means I need a ArtifactoryHook but not clear to me
how this integrates with connections.  Looking over the connection code the
mapping is hard coded but the plug-in docs say to just build a hook.  I
have a hello world hook but not sure how to link it with connections
(connection tab does not list it)

Thanks for your time reading this email