Skip task

2016-11-08 Thread Maycock, Luke
Hi All,


I am using Airflow 1.7.1.3 and have a particular requirement, which I don't 
think is currently supported by Airflow but just wanted to check in case I was 
missing something.


I occasionally wish to skip a particular task in a given DAG run such that the 
task does not run for that DAG run. Is this functionality available in Airflow?


I am aware of the BranchPythonOperator 
(https://airflow.incubator.apache.org/concepts.html#branching) but I don't 
think believe this is exactly what I am looking for.


I am thinking that a button in the UI alongside the 'Mark Success' and 'Run' 
buttons would be appropriate.


If the functionality does not exist, does anyone have any suggestions on ways 
to implement this?


Cheers,
Luke Maycock
OLIVER WYMAN
luke.mayc...@affiliate.oliverwyman.com
www.oliverwyman.com



This e-mail and any attachments may be confidential or legally privileged. If 
you received this message in error or are not the intended recipient, you 
should destroy the e-mail message and any attachments or copies, and you are 
prohibited from retaining, distributing, disclosing or using any information 
contained herein. Please inform us of the erroneous delivery by return e-mail. 
Thank you for your cooperation.


Re: Skip task

2016-11-08 Thread Gerard Toonstra
Also in 1.7.1.3, there's the ShortCircuitOperator, which can give you an
example.

https://github.com/apache/incubator-airflow/blob/1.7.1.3/airflow/operators/python_operator.py

You'd have to modify this to your needs, but the way it works is that if
the condition evaluates to True, none of the
downstream tasks are actually executed, they'd be skipped. The reason for
putting them into SKIPPED state is that
the DAG final result would still be SUCCESS and not failed.

You could copy the operator from there and don't do the full "for loop",
only pick the tasks immediately downstream
from this operator and skip that. Or... if you need to skip additional
tasks downstream, add a parameter "num_tasks"
that decide on a halting condition for the for loop.

I believe that should work. I didn't try that here, but you can test that
and see what it does for you.


If you want this as a UI capability... for example have a human operator
decide on skipping this yes or not, then
maybe the best way forward would be some kind of highly custom plugin with
its own view. In the end, you'd basically
do the same action in the backend, whether the python cond evaluates to
True or the button is clicked.

In the plugin case though, you'd have to keep the UI and the structure of
the DAG in sync and aligned, otherwise
it'd become a mess Airflow wasn't really developed for workflow/human
interaction, but in workflows where only
automated processes are involved. That doesn't mean that you can't do
anything like that, but it may be costly resource
wise to get this done. For example, on the basis of the BranchOperator, you
could call an external API to verify if a decision
was taken on a case, then follow branch A or B if the decision is there or
put the state back into UP_FOR_RETRY.
At the moment though, there's no programmatic way to reschedule that task
to some minutes or hours into the future before
it's looked at again, unless you really dive into airflow, scheduling
semantics (@once vs. other schedules) and how
the scheduler works.

Rgds,

Gerard




On Tue, Nov 8, 2016 at 5:30 PM, Maycock, Luke <
luke.mayc...@affiliate.oliverwyman.com> wrote:

> Hi All,
>
>
> I am using Airflow 1.7.1.3 and have a particular requirement, which I
> don't think is currently supported by Airflow but just wanted to check in
> case I was missing something.
>
>
> I occasionally wish to skip a particular task in a given DAG run such that
> the task does not run for that DAG run. Is this functionality available in
> Airflow?
>
>
> I am aware of the BranchPythonOperator (https://airflow.incubator.
> apache.org/concepts.html#branching) but I don't think believe this is
> exactly what I am looking for.
>
>
> I am thinking that a button in the UI alongside the 'Mark Success' and
> 'Run' buttons would be appropriate.
>
>
> If the functionality does not exist, does anyone have any suggestions on
> ways to implement this?
>
>
> Cheers,
> Luke Maycock
> OLIVER WYMAN
> luke.mayc...@affiliate.oliverwyman.com mayc...@affiliate.oliverwyman.com>
> www.oliverwyman.com
>
>
> 
> This e-mail and any attachments may be confidential or legally privileged.
> If you received this message in error or are not the intended recipient,
> you should destroy the e-mail message and any attachments or copies, and
> you are prohibited from retaining, distributing, disclosing or using any
> information contained herein. Please inform us of the erroneous delivery by
> return e-mail. Thank you for your cooperation.
>


Re: Skip task

2016-11-09 Thread Maycock, Luke
Hi Gerard,


Thank you for your quick response.


I am not trying to implement this for a specific operator but rather trying to 
add it as a feature for any task in any DAG.


Given that the skipped states propagate where all directly upstream tasks are 
skipped, I don't think this is the state we want to use. For the functionality 
I'm looking for, I think I'll need to introduce a new status, maybe 'disabled'.


Again, thanks for your response.


Cheers,
Luke Maycock
OLIVER WYMAN
luke.mayc...@affiliate.oliverwyman.com<mailto:luke.mayc...@affiliate.oliverwyman.com>
www.oliverwyman.com<http://www.oliverwyman.com/>




From: Gerard Toonstra 
Sent: 08 November 2016 18:19
To: dev@airflow.incubator.apache.org
Subject: Re: Skip task

Also in 1.7.1.3, there's the ShortCircuitOperator, which can give you an
example.

https://github.com/apache/incubator-airflow/blob/1.7.1.3/airflow/operators/python_operator.py

You'd have to modify this to your needs, but the way it works is that if
the condition evaluates to True, none of the
downstream tasks are actually executed, they'd be skipped. The reason for
putting them into SKIPPED state is that
the DAG final result would still be SUCCESS and not failed.

You could copy the operator from there and don't do the full "for loop",
only pick the tasks immediately downstream
from this operator and skip that. Or... if you need to skip additional
tasks downstream, add a parameter "num_tasks"
that decide on a halting condition for the for loop.

I believe that should work. I didn't try that here, but you can test that
and see what it does for you.


If you want this as a UI capability... for example have a human operator
decide on skipping this yes or not, then
maybe the best way forward would be some kind of highly custom plugin with
its own view. In the end, you'd basically
do the same action in the backend, whether the python cond evaluates to
True or the button is clicked.

In the plugin case though, you'd have to keep the UI and the structure of
the DAG in sync and aligned, otherwise
it'd become a mess Airflow wasn't really developed for workflow/human
interaction, but in workflows where only
automated processes are involved. That doesn't mean that you can't do
anything like that, but it may be costly resource
wise to get this done. For example, on the basis of the BranchOperator, you
could call an external API to verify if a decision
was taken on a case, then follow branch A or B if the decision is there or
put the state back into UP_FOR_RETRY.
At the moment though, there's no programmatic way to reschedule that task
to some minutes or hours into the future before
it's looked at again, unless you really dive into airflow, scheduling
semantics (@once vs. other schedules) and how
the scheduler works.

Rgds,

Gerard




On Tue, Nov 8, 2016 at 5:30 PM, Maycock, Luke <
luke.mayc...@affiliate.oliverwyman.com> wrote:

> Hi All,
>
>
> I am using Airflow 1.7.1.3 and have a particular requirement, which I
> don't think is currently supported by Airflow but just wanted to check in
> case I was missing something.
>
>
> I occasionally wish to skip a particular task in a given DAG run such that
> the task does not run for that DAG run. Is this functionality available in
> Airflow?
>
>
> I am aware of the BranchPythonOperator (https://airflow.incubator.
> apache.org/concepts.html#branching) but I don't think believe this is
> exactly what I am looking for.
>
>
> I am thinking that a button in the UI alongside the 'Mark Success' and
> 'Run' buttons would be appropriate.
>
>
> If the functionality does not exist, does anyone have any suggestions on
> ways to implement this?
>
>
> Cheers,
> Luke Maycock
> OLIVER WYMAN
> luke.mayc...@affiliate.oliverwyman.com<mailto:luke.
> mayc...@affiliate.oliverwyman.com>
> www.oliverwyman.com<http://www.oliverwyman.com/>
>
>
> 
> This e-mail and any attachments may be confidential or legally privileged.
> If you received this message in error or are not the intended recipient,
> you should destroy the e-mail message and any attachments or copies, and
> you are prohibited from retaining, distributing, disclosing or using any
> information contained herein. Please inform us of the erroneous delivery by
> return e-mail. Thank you for your cooperation.
>


This e-mail and any attachments may be confidential or legally privileged. If 
you received this message in error or are not the intended recipient, you 
should destroy the e-mail message and any attachments or copies, and you are 
prohibited from retaining, distributing, disclosing or using any information 
contained herein. Please inform us of the erroneous delivery by return e-mail. 
Thank you for your cooperation.


Re: Skip task

2016-11-09 Thread Gerard Toonstra
Hey Luke,

Who or what makes the decision to skip processing that task?

Rgds,

Gerard

On Wed, Nov 9, 2016 at 2:39 PM, Maycock, Luke <
luke.mayc...@affiliate.oliverwyman.com> wrote:

> Hi Gerard,
>
>
> Thank you for your quick response.
>
>
> I am not trying to implement this for a specific operator but rather
> trying to add it as a feature for any task in any DAG.
>
>
> Given that the skipped states propagate where all directly upstream tasks
> are skipped, I don't think this is the state we want to use. For the
> functionality I'm looking for, I think I'll need to introduce a new status,
> maybe 'disabled'.
>
>
> Again, thanks for your response.
>
>
> Cheers,
> Luke Maycock
> OLIVER WYMAN
> luke.mayc...@affiliate.oliverwyman.com<mailto:luke.
> mayc...@affiliate.oliverwyman.com>
> www.oliverwyman.com<http://www.oliverwyman.com/>
>
>
>
> ____
> From: Gerard Toonstra 
> Sent: 08 November 2016 18:19
> To: dev@airflow.incubator.apache.org
> Subject: Re: Skip task
>
> Also in 1.7.1.3, there's the ShortCircuitOperator, which can give you an
> example.
>
> https://github.com/apache/incubator-airflow/blob/1.7.1.
> 3/airflow/operators/python_operator.py
>
> You'd have to modify this to your needs, but the way it works is that if
> the condition evaluates to True, none of the
> downstream tasks are actually executed, they'd be skipped. The reason for
> putting them into SKIPPED state is that
> the DAG final result would still be SUCCESS and not failed.
>
> You could copy the operator from there and don't do the full "for loop",
> only pick the tasks immediately downstream
> from this operator and skip that. Or... if you need to skip additional
> tasks downstream, add a parameter "num_tasks"
> that decide on a halting condition for the for loop.
>
> I believe that should work. I didn't try that here, but you can test that
> and see what it does for you.
>
>
> If you want this as a UI capability... for example have a human operator
> decide on skipping this yes or not, then
> maybe the best way forward would be some kind of highly custom plugin with
> its own view. In the end, you'd basically
> do the same action in the backend, whether the python cond evaluates to
> True or the button is clicked.
>
> In the plugin case though, you'd have to keep the UI and the structure of
> the DAG in sync and aligned, otherwise
> it'd become a mess Airflow wasn't really developed for workflow/human
> interaction, but in workflows where only
> automated processes are involved. That doesn't mean that you can't do
> anything like that, but it may be costly resource
> wise to get this done. For example, on the basis of the BranchOperator, you
> could call an external API to verify if a decision
> was taken on a case, then follow branch A or B if the decision is there or
> put the state back into UP_FOR_RETRY.
> At the moment though, there's no programmatic way to reschedule that task
> to some minutes or hours into the future before
> it's looked at again, unless you really dive into airflow, scheduling
> semantics (@once vs. other schedules) and how
> the scheduler works.
>
> Rgds,
>
> Gerard
>
>
>
>
> On Tue, Nov 8, 2016 at 5:30 PM, Maycock, Luke <
> luke.mayc...@affiliate.oliverwyman.com> wrote:
>
> > Hi All,
> >
> >
> > I am using Airflow 1.7.1.3 and have a particular requirement, which I
> > don't think is currently supported by Airflow but just wanted to check in
> > case I was missing something.
> >
> >
> > I occasionally wish to skip a particular task in a given DAG run such
> that
> > the task does not run for that DAG run. Is this functionality available
> in
> > Airflow?
> >
> >
> > I am aware of the BranchPythonOperator (https://airflow.incubator.
> > apache.org/concepts.html#branching) but I don't think believe this is
> > exactly what I am looking for.
> >
> >
> > I am thinking that a button in the UI alongside the 'Mark Success' and
> > 'Run' buttons would be appropriate.
> >
> >
> > If the functionality does not exist, does anyone have any suggestions on
> > ways to implement this?
> >
> >
> > Cheers,
> > Luke Maycock
> > OLIVER WYMAN
> > luke.mayc...@affiliate.oliverwyman.com<mailto:luke.
> > mayc...@affiliate.oliverwyman.com>
> > www.oliverwyman.com<http://www.oliverwyman.com/>
> >
> >
> > 
> > 

Re: Skip task

2016-11-10 Thread Maycock, Luke
Hi Gerard,


I see the new status as having a number of uses:

 1.  A user can manually set a task to skip in a DAG run via the UI.
 2.  We can then make use of this new status to add the following functionality 
to Airflow:
*   Run a DAG run up to a certain point and have the rest of the tasks have 
the new status.
*   Run a DAG run from a certain task to the end, setting all pre-requisite 
tasks to have this new status.

I am happy to be challenged on the above use cases if there are better ways to 
achieve the same things.

Cheers,
Luke Maycock
OLIVER WYMAN
luke.mayc...@affiliate.oliverwyman.com<mailto:luke.mayc...@affiliate.oliverwyman.com>
www.oliverwyman.com<http://www.oliverwyman.com/>




From: Gerard Toonstra 
Sent: 09 November 2016 18:08
To: dev@airflow.incubator.apache.org
Subject: Re: Skip task

Hey Luke,

Who or what makes the decision to skip processing that task?

Rgds,

Gerard

On Wed, Nov 9, 2016 at 2:39 PM, Maycock, Luke <
luke.mayc...@affiliate.oliverwyman.com> wrote:

> Hi Gerard,
>
>
> Thank you for your quick response.
>
>
> I am not trying to implement this for a specific operator but rather
> trying to add it as a feature for any task in any DAG.
>
>
> Given that the skipped states propagate where all directly upstream tasks
> are skipped, I don't think this is the state we want to use. For the
> functionality I'm looking for, I think I'll need to introduce a new status,
> maybe 'disabled'.
>
>
> Again, thanks for your response.
>
>
> Cheers,
> Luke Maycock
> OLIVER WYMAN
> luke.mayc...@affiliate.oliverwyman.com<mailto:luke.
> mayc...@affiliate.oliverwyman.com>
> www.oliverwyman.com<http://www.oliverwyman.com/>
>
>
>
> ____
> From: Gerard Toonstra 
> Sent: 08 November 2016 18:19
> To: dev@airflow.incubator.apache.org
> Subject: Re: Skip task
>
> Also in 1.7.1.3, there's the ShortCircuitOperator, which can give you an
> example.
>
> https://github.com/apache/incubator-airflow/blob/1.7.1.
> 3/airflow/operators/python_operator.py
>
> You'd have to modify this to your needs, but the way it works is that if
> the condition evaluates to True, none of the
> downstream tasks are actually executed, they'd be skipped. The reason for
> putting them into SKIPPED state is that
> the DAG final result would still be SUCCESS and not failed.
>
> You could copy the operator from there and don't do the full "for loop",
> only pick the tasks immediately downstream
> from this operator and skip that. Or... if you need to skip additional
> tasks downstream, add a parameter "num_tasks"
> that decide on a halting condition for the for loop.
>
> I believe that should work. I didn't try that here, but you can test that
> and see what it does for you.
>
>
> If you want this as a UI capability... for example have a human operator
> decide on skipping this yes or not, then
> maybe the best way forward would be some kind of highly custom plugin with
> its own view. In the end, you'd basically
> do the same action in the backend, whether the python cond evaluates to
> True or the button is clicked.
>
> In the plugin case though, you'd have to keep the UI and the structure of
> the DAG in sync and aligned, otherwise
> it'd become a mess Airflow wasn't really developed for workflow/human
> interaction, but in workflows where only
> automated processes are involved. That doesn't mean that you can't do
> anything like that, but it may be costly resource
> wise to get this done. For example, on the basis of the BranchOperator, you
> could call an external API to verify if a decision
> was taken on a case, then follow branch A or B if the decision is there or
> put the state back into UP_FOR_RETRY.
> At the moment though, there's no programmatic way to reschedule that task
> to some minutes or hours into the future before
> it's looked at again, unless you really dive into airflow, scheduling
> semantics (@once vs. other schedules) and how
> the scheduler works.
>
> Rgds,
>
> Gerard
>
>
>
>
> On Tue, Nov 8, 2016 at 5:30 PM, Maycock, Luke <
> luke.mayc...@affiliate.oliverwyman.com> wrote:
>
> > Hi All,
> >
> >
> > I am using Airflow 1.7.1.3 and have a particular requirement, which I
> > don't think is currently supported by Airflow but just wanted to check in
> > case I was missing something.
> >
> >
> > I occasionally wish to skip a particular task in a given DAG run such
> that
> > the task does not run for that DAG run. Is this functionality available
> in
> &

Re: Skip task

2016-11-14 Thread siddharth anand
For cases like this, we (Agari) use the following approach :

   1. Create a Variable in the UI of type boolean such as *enable_feature_x*
   2. Use a ShortCircuitOperator (or BranchPythonOperator) to Skip
   downstream processing based on the value of *enable_feature_x*
   3. Assuming that you don't want to skip ALL downstream tasks, you can
   use a trigger_rule of all_done to resume processing some portion of your
   downstream DAG after skipping an upstream portion

In other words, there is already a means to achieve what you are asking for
today. You can change the value of via *enable_feature_x  *the UI. If you'd
like to enhance the UI to better capture this pattern, pls submit a PR.
-s

On Thu, Nov 10, 2016 at 1:20 PM, Maycock, Luke <
luke.mayc...@affiliate.oliverwyman.com> wrote:

> Hi Gerard,
>
>
> I see the new status as having a number of uses:
>
>  1.  A user can manually set a task to skip in a DAG run via the UI.
>  2.  We can then make use of this new status to add the following
> functionality to Airflow:
> *   Run a DAG run up to a certain point and have the rest of the tasks
> have the new status.
> *   Run a DAG run from a certain task to the end, setting all
> pre-requisite tasks to have this new status.
>
> I am happy to be challenged on the above use cases if there are better
> ways to achieve the same things.
>
> Cheers,
> Luke Maycock
> OLIVER WYMAN
> luke.mayc...@affiliate.oliverwyman.com<mailto:luke.
> mayc...@affiliate.oliverwyman.com>
> www.oliverwyman.com<http://www.oliverwyman.com/>
>
>
>
> 
> From: Gerard Toonstra 
> Sent: 09 November 2016 18:08
> To: dev@airflow.incubator.apache.org
> Subject: Re: Skip task
>
> Hey Luke,
>
> Who or what makes the decision to skip processing that task?
>
> Rgds,
>
> Gerard
>
> On Wed, Nov 9, 2016 at 2:39 PM, Maycock, Luke <
> luke.mayc...@affiliate.oliverwyman.com> wrote:
>
> > Hi Gerard,
> >
> >
> > Thank you for your quick response.
> >
> >
> > I am not trying to implement this for a specific operator but rather
> > trying to add it as a feature for any task in any DAG.
> >
> >
> > Given that the skipped states propagate where all directly upstream tasks
> > are skipped, I don't think this is the state we want to use. For the
> > functionality I'm looking for, I think I'll need to introduce a new
> status,
> > maybe 'disabled'.
> >
> >
> > Again, thanks for your response.
> >
> >
> > Cheers,
> > Luke Maycock
> > OLIVER WYMAN
> > luke.mayc...@affiliate.oliverwyman.com<mailto:luke.
> > mayc...@affiliate.oliverwyman.com>
> > www.oliverwyman.com<http://www.oliverwyman.com/>
> >
> >
> >
> > 
> > From: Gerard Toonstra 
> > Sent: 08 November 2016 18:19
> > To: dev@airflow.incubator.apache.org
> > Subject: Re: Skip task
> >
> > Also in 1.7.1.3, there's the ShortCircuitOperator, which can give you an
> > example.
> >
> > https://github.com/apache/incubator-airflow/blob/1.7.1.
> > 3/airflow/operators/python_operator.py
> >
> > You'd have to modify this to your needs, but the way it works is that if
> > the condition evaluates to True, none of the
> > downstream tasks are actually executed, they'd be skipped. The reason for
> > putting them into SKIPPED state is that
> > the DAG final result would still be SUCCESS and not failed.
> >
> > You could copy the operator from there and don't do the full "for loop",
> > only pick the tasks immediately downstream
> > from this operator and skip that. Or... if you need to skip additional
> > tasks downstream, add a parameter "num_tasks"
> > that decide on a halting condition for the for loop.
> >
> > I believe that should work. I didn't try that here, but you can test that
> > and see what it does for you.
> >
> >
> > If you want this as a UI capability... for example have a human operator
> > decide on skipping this yes or not, then
> > maybe the best way forward would be some kind of highly custom plugin
> with
> > its own view. In the end, you'd basically
> > do the same action in the backend, whether the python cond evaluates to
> > True or the button is clicked.
> >
> > In the plugin case though, you'd have to keep the UI and the structure of
> > the DAG in sync and aligned, otherwise
> > it'd become a mess Airflow wasn't really developed for workflow/human
> > interact

Re: Skip task

2016-11-15 Thread Maycock, Luke
Thank you for taking the time to respond. This is a great approach if you know 
at the time of creating the DAG which tasks you expect to need to skip. 
However, I don't think this is exactly the use case I have. For example, I may 
be expecting a file to arrive in an FTP folder for loading into a database but 
one day it doesn't arrive so I just want to skip that task on that day.


Our workflows commonly have around 20 of these types of tasks in. I could 
configure all of these tasks in the way you suggested in case I ever need to 
skip one of them. However, I'd prefer not to have to set the tasks up this way 
and instead have the ability just to skip a task on an ad-hoc basis. I could 
then also use this functionality to add the ability to run from a certain point 
in a DAG or to a certain point in the DAG.



Thanks,
Luke Maycock
OLIVER WYMAN
luke.mayc...@affiliate.oliverwyman.com<mailto:luke.mayc...@affiliate.oliverwyman.com>
www.oliverwyman.com<http://www.oliverwyman.com/>




From: siddharth anand 
Sent: 14 November 2016 19:48
To: dev@airflow.incubator.apache.org
Subject: Re: Skip task

For cases like this, we (Agari) use the following approach :

  1. Create a Variable in the UI of type boolean such as *enable_feature_x*
  2. Use a ShortCircuitOperator (or BranchPythonOperator) to Skip
  downstream processing based on the value of *enable_feature_x*
  3. Assuming that you don't want to skip ALL downstream tasks, you can
  use a trigger_rule of all_done to resume processing some portion of your
  downstream DAG after skipping an upstream portion

In other words, there is already a means to achieve what you are asking for
today. You can change the value of via *enable_feature_x  *the UI. If you'd
like to enhance the UI to better capture this pattern, pls submit a PR.
-s

On Thu, Nov 10, 2016 at 1:20 PM, Maycock, Luke <
luke.mayc...@affiliate.oliverwyman.com> wrote:

> Hi Gerard,
>
>
> I see the new status as having a number of uses:
>
>  1.  A user can manually set a task to skip in a DAG run via the UI.
>  2.  We can then make use of this new status to add the following
> functionality to Airflow:
> *   Run a DAG run up to a certain point and have the rest of the tasks
> have the new status.
> *   Run a DAG run from a certain task to the end, setting all
> pre-requisite tasks to have this new status.
>
> I am happy to be challenged on the above use cases if there are better
> ways to achieve the same things.
>
> Cheers,
> Luke Maycock
> OLIVER WYMAN
> luke.mayc...@affiliate.oliverwyman.com<mailto:luke.
> mayc...@affiliate.oliverwyman.com>
> www.oliverwyman.com<http://www.oliverwyman.com/>
>
>
>
> ____
> From: Gerard Toonstra 
> Sent: 09 November 2016 18:08
> To: dev@airflow.incubator.apache.org
> Subject: Re: Skip task
>
> Hey Luke,
>
> Who or what makes the decision to skip processing that task?
>
> Rgds,
>
> Gerard
>
> On Wed, Nov 9, 2016 at 2:39 PM, Maycock, Luke <
> luke.mayc...@affiliate.oliverwyman.com> wrote:
>
> > Hi Gerard,
> >
> >
> > Thank you for your quick response.
> >
> >
> > I am not trying to implement this for a specific operator but rather
> > trying to add it as a feature for any task in any DAG.
> >
> >
> > Given that the skipped states propagate where all directly upstream tasks
> > are skipped, I don't think this is the state we want to use. For the
> > functionality I'm looking for, I think I'll need to introduce a new
> status,
> > maybe 'disabled'.
> >
> >
> > Again, thanks for your response.
> >
> >
> > Cheers,
> > Luke Maycock
> > OLIVER WYMAN
> > luke.mayc...@affiliate.oliverwyman.com<mailto:luke.
> > mayc...@affiliate.oliverwyman.com>
> > www.oliverwyman.com<http://www.oliverwyman.com/>
> >
> >
> >
> > 
> > From: Gerard Toonstra 
> > Sent: 08 November 2016 18:19
> > To: dev@airflow.incubator.apache.org
> > Subject: Re: Skip task
> >
> > Also in 1.7.1.3, there's the ShortCircuitOperator, which can give you an
> > example.
> >
> > https://github.com/apache/incubator-airflow/blob/1.7.1.
> > 3/airflow/operators/python_operator.py
> >
> > You'd have to modify this to your needs, but the way it works is that if
> > the condition evaluates to True, none of the
> > downstream tasks are actually executed, they'd be skipped. The reason for
> > putting them into SKIPPED state is that
> > the DAG final result would still be SUCCESS and not failed.
> >
> > You could cop

Re: Skip task

2016-11-15 Thread siddharth anand
If your requirement is to skip a portion of tasks in a DagRun based on some
state encountered while executing that DagRun, that is what
BranchPythonOperator or ShortCircruitOperator (optionally paired with a
Trigger Rule specified on a downstream task) is made for.

These operators take a custom Python callable as a argument. The callable
can check for the existence of data or files that should have been
generated by an external system or an upstream task in the same DAG. The
callables need to return a Boolean value in the case of the
ShortCircruitOperator or a selected choice (i.e. branch to take) as in the
case of the BranchPythonOperator.

If you have 20 tasks that all depend on the presence of 20 different files,
you would need 20 ShortCircruitOperator or BranchPythonOperator tasks each
either sharing a common callable or each with its own callable.

One could argue that these tasks are "overhead" because they just encompass
some conditional or control logic and that DAGs should only contain
workhorse tasks (i.e. tasks that do some  work). DAGs with workhorse-only
tasks are more of a pure dataflow approach -- i.e. no control-logic
operators. However, I don't see another option.

In the current system, a callable registered with a ShortCircruitOperator
would check for the presence of a file -- if the file were not available,
then a series of downstream tasks would be skipped in that DAGRun, until a
task with a Trigger_Rule="all_done" were encountered, downstream of which,
tasks would no longer be skipped for the DagRun.

I hope this makes sense.

A long time ago, I proposed UI functionality to skip a series of DAG runs
via the UI, because I knew that no data was available for that time range
from an external system. It wanted to essentially specify a "blackout"
period in terms of a time range that covered multiple DagRuns. My intention
was for backfills to skip those days. It turns out that my company did not
end up having such a requirement, so I dropped the feature request.

If this is what you are asking for, then I am +1. Please implement it and
submit a PR.

On Tue, Nov 15, 2016 at 2:50 AM, Maycock, Luke <
luke.mayc...@affiliate.oliverwyman.com> wrote:

> Thank you for taking the time to respond. This is a great approach if you
> know at the time of creating the DAG which tasks you expect to need to
> skip. However, I don't think this is exactly the use case I have. For
> example, I may be expecting a file to arrive in an FTP folder for loading
> into a database but one day it doesn't arrive so I just want to skip that
> task on that day.
>
>
> Our workflows commonly have around 20 of these types of tasks in. I could
> configure all of these tasks in the way you suggested in case I ever need
> to skip one of them. However, I'd prefer not to have to set the tasks up
> this way and instead have the ability just to skip a task on an ad-hoc
> basis. I could then also use this functionality to add the ability to run
> from a certain point in a DAG or to a certain point in the DAG.
>
>
>
> Thanks,
> Luke Maycock
> OLIVER WYMAN
> luke.mayc...@affiliate.oliverwyman.com<mailto:luke.
> mayc...@affiliate.oliverwyman.com>
> www.oliverwyman.com<http://www.oliverwyman.com/>
>
>
>
> ____
> From: siddharth anand 
> Sent: 14 November 2016 19:48
> To: dev@airflow.incubator.apache.org
> Subject: Re: Skip task
>
> For cases like this, we (Agari) use the following approach :
>
>   1. Create a Variable in the UI of type boolean such as *enable_feature_x*
>   2. Use a ShortCircuitOperator (or BranchPythonOperator) to Skip
>   downstream processing based on the value of *enable_feature_x*
>   3. Assuming that you don't want to skip ALL downstream tasks, you can
>   use a trigger_rule of all_done to resume processing some portion of your
>   downstream DAG after skipping an upstream portion
>
> In other words, there is already a means to achieve what you are asking for
> today. You can change the value of via *enable_feature_x  *the UI. If you'd
> like to enhance the UI to better capture this pattern, pls submit a PR.
> -s
>
> On Thu, Nov 10, 2016 at 1:20 PM, Maycock, Luke <
> luke.mayc...@affiliate.oliverwyman.com> wrote:
>
> > Hi Gerard,
> >
> >
> > I see the new status as having a number of uses:
> >
> >  1.  A user can manually set a task to skip in a DAG run via the UI.
> >  2.  We can then make use of this new status to add the following
> > functionality to Airflow:
> > *   Run a DAG run up to a certain point and have the rest of the
> tasks
> > have the new status.
> > *   Run a DAG run from a certain task to the end, setting all
> > pre-requisite tasks to have 

Re: Skip task

2016-12-08 Thread Maycock, Luke
rtTrue(exclusion)


The unit test passes for postgreSQL and SQLite but fails for MySQL. I have 
checked and the 'exclusion' variable contains a TaskExclusion object for 
postgreSQL and SQLite but is set to 'None' for MySQL. Any suggestions on what 
could be causing this would be much appreciated.


Cheers,
Luke Maycock
OLIVER WYMAN
luke.mayc...@affiliate.oliverwyman.com<mailto:luke.mayc...@affiliate.oliverwyman.com>
www.oliverwyman.com<http://www.oliverwyman.com/>




From: siddharth anand 
Sent: 16 November 2016 00:40
To: dev@airflow.incubator.apache.org
Subject: Re: Skip task

If your requirement is to skip a portion of tasks in a DagRun based on some
state encountered while executing that DagRun, that is what
BranchPythonOperator or ShortCircruitOperator (optionally paired with a
Trigger Rule specified on a downstream task) is made for.

These operators take a custom Python callable as a argument. The callable
can check for the existence of data or files that should have been
generated by an external system or an upstream task in the same DAG. The
callables need to return a Boolean value in the case of the
ShortCircruitOperator or a selected choice (i.e. branch to take) as in the
case of the BranchPythonOperator.

If you have 20 tasks that all depend on the presence of 20 different files,
you would need 20 ShortCircruitOperator or BranchPythonOperator tasks each
either sharing a common callable or each with its own callable.

One could argue that these tasks are "overhead" because they just encompass
some conditional or control logic and that DAGs should only contain
workhorse tasks (i.e. tasks that do some  work). DAGs with workhorse-only
tasks are more of a pure dataflow approach -- i.e. no control-logic
operators. However, I don't see another option.

In the current system, a callable registered with a ShortCircruitOperator
would check for the presence of a file -- if the file were not available,
then a series of downstream tasks would be skipped in that DAGRun, until a
task with a Trigger_Rule="all_done" were encountered, downstream of which,
tasks would no longer be skipped for the DagRun.

I hope this makes sense.

A long time ago, I proposed UI functionality to skip a series of DAG runs
via the UI, because I knew that no data was available for that time range
from an external system. It wanted to essentially specify a "blackout"
period in terms of a time range that covered multiple DagRuns. My intention
was for backfills to skip those days. It turns out that my company did not
end up having such a requirement, so I dropped the feature request.

If this is what you are asking for, then I am +1. Please implement it and
submit a PR.

On Tue, Nov 15, 2016 at 2:50 AM, Maycock, Luke <
luke.mayc...@affiliate.oliverwyman.com> wrote:

> Thank you for taking the time to respond. This is a great approach if you
> know at the time of creating the DAG which tasks you expect to need to
> skip. However, I don't think this is exactly the use case I have. For
> example, I may be expecting a file to arrive in an FTP folder for loading
> into a database but one day it doesn't arrive so I just want to skip that
> task on that day.
>
>
> Our workflows commonly have around 20 of these types of tasks in. I could
> configure all of these tasks in the way you suggested in case I ever need
> to skip one of them. However, I'd prefer not to have to set the tasks up
> this way and instead have the ability just to skip a task on an ad-hoc
> basis. I could then also use this functionality to add the ability to run
> from a certain point in a DAG or to a certain point in the DAG.
>
>
>
> Thanks,
> Luke Maycock
> OLIVER WYMAN
> luke.mayc...@affiliate.oliverwyman.com<mailto:luke.
> mayc...@affiliate.oliverwyman.com>
> www.oliverwyman.com<http://www.oliverwyman.com/>
>
>
>
> 
> From: siddharth anand 
> Sent: 14 November 2016 19:48
> To: dev@airflow.incubator.apache.org
> Subject: Re: Skip task
>
> For cases like this, we (Agari) use the following approach :
>
>   1. Create a Variable in the UI of type boolean such as *enable_feature_x*
>   2. Use a ShortCircuitOperator (or BranchPythonOperator) to Skip
>   downstream processing based on the value of *enable_feature_x*
>   3. Assuming that you don't want to skip ALL downstream tasks, you can
>   use a trigger_rule of all_done to resume processing some portion of your
>   downstream DAG after skipping an upstream portion
>
> In other words, there is already a means to achieve what you are asking for
> today. You can change the value of via *enable_feature_x  *the UI. If you'd
> like to enhance the UI to better capture this pattern, pls submit a PR.
> -s
>
> On Thu, Nov 10, 201

Re: Skip task

2016-12-09 Thread Maycock, Luke
I found the issue to be that, for MySQL, the datetime was being rounded to the 
nearest second. The strange thing is that if a datetime without the 
microseconds was passed to SQLAlchemy, the insertion into MySQL failed; but 
when a datetime with microseconds was passed, the microseconds are removed by 
rounding to the nearest second.


Hopefully, this will prevent someone else going down the same rabbit hole that 
I did.


Cheers,
Luke Maycock
OLIVER WYMAN
luke.mayc...@affiliate.oliverwyman.com<mailto:luke.mayc...@affiliate.oliverwyman.com>
www.oliverwyman.com<http://www.oliverwyman.com/>



From: Maycock, Luke 
Sent: 08 December 2016 10:44:32
To: dev@airflow.incubator.apache.org
Subject: Re: Skip task

Hi All,


We have implemented a solution for allowing the exclusion of individual tasks 
during a DAG run. However, when writing unit tests for this, we are 
encountering an issue with MySQL, which I am hoping someone is able to help us 
with.


For our solution, we have a new 'TaskExclusion' table in the meta-data. Our 
unit tests were run by Travis, not locally.


The code block under test:


class TaskExclusion(Base):
   """
This class is used to define objects that can be used to specify not to
run a given task in a given dag on a variety of execution date conditions.
These objects will be stored in the backend database in the task_exclusion
table.
Static methods are provided for the creation, removal and investigation of
these objects.
"""

__tablename__ = "task_exclusion"

id = Column(Integer(), primary_key=True)
   dag_id = Column(String(ID_LEN), nullable=False)
   task_id = Column(String(ID_LEN), nullable=False)
   exclusion_type = Column(String(32), nullable=False)
   exclusion_start_date = Column(DateTime, nullable=True)
   exclusion_end_date = Column(DateTime, nullable=True)
   created_by = Column(String(256), nullable=False)
   created_on = Column(DateTime, nullable=False)

   @classmethod
@provide_session
def set(
   cls,
   dag_id,
   task_id,
   exclusion_type,
   exclusion_start_date,
   exclusion_end_date,
   created_by,
   session=None):
   """
Add a task exclusion to prevent a task running under certain
circumstances.
:param dag_id: The dag_id of the DAG containing the task to exclude
from execution.
:param task_id: The task_id of the task to exclude from execution.
:param exclusion_type: The type of circumstances to exclude the task
from execution under. See the TaskExclusionType class for more detail.
:param exclusion_start_date: The execution_date to start excluding on.
This will be ignored if the exclusion_type is INDEFINITE.
:param exclusion_end_date: The execution_date to stop excluding on.
This will be ignored if the exclusion_type is INDEFINITE or
SINGLE_DATE.
:param created_by: Who is creating this exclusion. Stored with the
exclusion record for auditing/debugging purposes.
:return: None.
"""

session.expunge_all()

   # Set up execution date range correctly
if exclusion_type == TaskExclusionType.SINGLE_DATE:
   if exclusion_start_date:
   exclusion_end_date = exclusion_start_date
   else:
   raise AirflowException(
   "No exclusion_start_date "
)
   elif exclusion_type == TaskExclusionType.DATE_RANGE:
   if exclusion_start_date > exclusion_end_date:
   raise AirflowException(
   "The exclusion_start_date is after the exclusion_end_date"
)
   elif exclusion_type == TaskExclusionType.INDEFINITE:
   exclusion_start_date = None
exclusion_end_date = None
else:
   raise AirflowException(
   "The exclusion_type, {}, is not recognised."
.format(exclusion_type)
   )

   # remove any duplicate exclusions
session.query(cls).filter(
   cls.dag_id == dag_id,
   cls.task_id == task_id,
   cls.exclusion_type == exclusion_type,
   cls.exclusion_start_date == exclusion_start_date,
   cls.exclusion_end_date == exclusion_end_date
   ).delete()

   # insert new exclusion
session.add(TaskExclusion(
   dag_id=dag_id,
   task_id=task_id,
   exclusion_type=exclusion_type,
   exclusion_start_date=exclusion_start_date,
   exclusion_end_date=exclusion_end_date,
   created_by=created_by,
   created_on=datetime.now())
   )

   session.commit()


The unit test:

class TaskExclusionTest(unittest.TestCase):
   def test_set_exclusion(self, session=None):

   session = settings.Session()

   session.expunge_all()

   dag_id = 'test_task_exclude'
task_id = 'test_task_exclude'
exec_date = datetime.datetime.now()

   TaskExclusion.set(dag_id=dag_id,
   

Re: Skip task

2016-12-09 Thread Bolke de Bruin
What table was this? I recently pushed a fix that allows fractional seconds in 
our minimum supported version of MySQL (5.6.4 and beyond). 

I might have missed something. 

Thanks
Bolke

Sent from my iPhone

> On 9 Dec 2016, at 14:27, Maycock, Luke 
>  wrote:
> 
> I found the issue to be that, for MySQL, the datetime was being rounded to 
> the nearest second. The strange thing is that if a datetime without the 
> microseconds was passed to SQLAlchemy, the insertion into MySQL failed; but 
> when a datetime with microseconds was passed, the microseconds are removed by 
> rounding to the nearest second.
> 
> 
> Hopefully, this will prevent someone else going down the same rabbit hole 
> that I did.
> 
> 
> Cheers,
> Luke Maycock
> OLIVER WYMAN
> luke.mayc...@affiliate.oliverwyman.com<mailto:luke.mayc...@affiliate.oliverwyman.com>
> www.oliverwyman.com<http://www.oliverwyman.com/>
> 
> 
> 
> From: Maycock, Luke 
> Sent: 08 December 2016 10:44:32
> To: dev@airflow.incubator.apache.org
> Subject: Re: Skip task
> 
> Hi All,
> 
> 
> We have implemented a solution for allowing the exclusion of individual tasks 
> during a DAG run. However, when writing unit tests for this, we are 
> encountering an issue with MySQL, which I am hoping someone is able to help 
> us with.
> 
> 
> For our solution, we have a new 'TaskExclusion' table in the meta-data. Our 
> unit tests were run by Travis, not locally.
> 
> 
> The code block under test:
> 
> 
> class TaskExclusion(Base):
>   """
> This class is used to define objects that can be used to specify not to
> run a given task in a given dag on a variety of execution date conditions.
> These objects will be stored in the backend database in the task_exclusion
> table.
> Static methods are provided for the creation, removal and investigation of
> these objects.
> """
> 
> __tablename__ = "task_exclusion"
> 
> id = Column(Integer(), primary_key=True)
>   dag_id = Column(String(ID_LEN), nullable=False)
>   task_id = Column(String(ID_LEN), nullable=False)
>   exclusion_type = Column(String(32), nullable=False)
>   exclusion_start_date = Column(DateTime, nullable=True)
>   exclusion_end_date = Column(DateTime, nullable=True)
>   created_by = Column(String(256), nullable=False)
>   created_on = Column(DateTime, nullable=False)
> 
>   @classmethod
> @provide_session
> def set(
>   cls,
>   dag_id,
>   task_id,
>   exclusion_type,
>   exclusion_start_date,
>   exclusion_end_date,
>   created_by,
>   session=None):
>   """
> Add a task exclusion to prevent a task running under certain
> circumstances.
> :param dag_id: The dag_id of the DAG containing the task to exclude
> from execution.
> :param task_id: The task_id of the task to exclude from execution.
> :param exclusion_type: The type of circumstances to exclude the task
> from execution under. See the TaskExclusionType class for more detail.
> :param exclusion_start_date: The execution_date to start excluding on.
> This will be ignored if the exclusion_type is INDEFINITE.
> :param exclusion_end_date: The execution_date to stop excluding on.
> This will be ignored if the exclusion_type is INDEFINITE or
> SINGLE_DATE.
> :param created_by: Who is creating this exclusion. Stored with the
> exclusion record for auditing/debugging purposes.
> :return: None.
> """
> 
> session.expunge_all()
> 
>   # Set up execution date range correctly
> if exclusion_type == TaskExclusionType.SINGLE_DATE:
>   if exclusion_start_date:
>   exclusion_end_date = exclusion_start_date
>   else:
>   raise AirflowException(
>   "No exclusion_start_date "
> )
>   elif exclusion_type == TaskExclusionType.DATE_RANGE:
>   if exclusion_start_date > exclusion_end_date:
>   raise AirflowException(
>   "The exclusion_start_date is after the exclusion_end_date"
> )
>   elif exclusion_type == TaskExclusionType.INDEFINITE:
>   exclusion_start_date = None
> exclusion_end_date = None
> else:
>   raise AirflowException(
>   "The exclusion_type, {}, is not recognised."
> .format(exclusion_type)
>   )
> 
>   # remove any duplicate exclusions
> session.query(cls).filter(
>   cls.dag_id == dag_id,
>   cls.task_id == task_id,
>   cls.exclusion_type == exclusion_type,
>   cls.exclusion_start_date == exclusion_start_d

Re: Skip task

2016-12-12 Thread Maycock, Luke
It is a new table named 'TaskExclusion'. The migration script for this is as 
follows:

def upgrade():
op.create_table(
'task_exclusion',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('dag_id', sa.String(length=250), nullable=False),
sa.Column('task_id', sa.String(length=250), nullable=False),
sa.Column('exclusion_type', sa.String(length=32), nullable=False),
sa.Column('exclusion_start_date', sa.DateTime(), nullable=False),
sa.Column('exclusion_end_date', sa.DateTime(), nullable=False),
sa.Column('created_by', sa.String(length=256), nullable=False),
sa.Column('created_on', sa.DateTime(), nullable=False),
sa.PrimaryKeyConstraint('id'))


def downgrade():
op.drop_table('task_exclusion')

This is the PR for the exclusion of a task. We review our code internally 
before setting up a PR into the main repo for the next review, hence the PR 
being in our fork. The PR does not yet contain our unit tests.


Cheers,
Luke Maycock
OLIVER WYMAN
luke.mayc...@affiliate.oliverwyman.com<mailto:luke.mayc...@affiliate.oliverwyman.com>
www.oliverwyman.com<http://www.oliverwyman.com/>



____
From: Bolke de Bruin 
Sent: 09 December 2016 20:54
To: dev@airflow.incubator.apache.org
Subject: Re: Skip task

What table was this? I recently pushed a fix that allows fractional seconds in 
our minimum supported version of MySQL (5.6.4 and beyond).

I might have missed something.

Thanks
Bolke

Sent from my iPhone

> On 9 Dec 2016, at 14:27, Maycock, Luke 
>  wrote:
>
> I found the issue to be that, for MySQL, the datetime was being rounded to 
> the nearest second. The strange thing is that if a datetime without the 
> microseconds was passed to SQLAlchemy, the insertion into MySQL failed; but 
> when a datetime with microseconds was passed, the microseconds are removed by 
> rounding to the nearest second.
>
>
> Hopefully, this will prevent someone else going down the same rabbit hole 
> that I did.
>
>
> Cheers,
> Luke Maycock
> OLIVER WYMAN
> luke.mayc...@affiliate.oliverwyman.com<mailto:luke.mayc...@affiliate.oliverwyman.com>
> www.oliverwyman.com<http://www.oliverwyman.com/>
>
>
> 
> From: Maycock, Luke 
> Sent: 08 December 2016 10:44:32
> To: dev@airflow.incubator.apache.org
> Subject: Re: Skip task
>
> Hi All,
>
>
> We have implemented a solution for allowing the exclusion of individual tasks 
> during a DAG run. However, when writing unit tests for this, we are 
> encountering an issue with MySQL, which I am hoping someone is able to help 
> us with.
>
>
> For our solution, we have a new 'TaskExclusion' table in the meta-data. Our 
> unit tests were run by Travis, not locally.
>
>
> The code block under test:
>
>
> class TaskExclusion(Base):
>   """
> This class is used to define objects that can be used to specify not to
> run a given task in a given dag on a variety of execution date conditions.
> These objects will be stored in the backend database in the task_exclusion
> table.
> Static methods are provided for the creation, removal and investigation of
> these objects.
> """
>
> __tablename__ = "task_exclusion"
>
> id = Column(Integer(), primary_key=True)
>   dag_id = Column(String(ID_LEN), nullable=False)
>   task_id = Column(String(ID_LEN), nullable=False)
>   exclusion_type = Column(String(32), nullable=False)
>   exclusion_start_date = Column(DateTime, nullable=True)
>   exclusion_end_date = Column(DateTime, nullable=True)
>   created_by = Column(String(256), nullable=False)
>   created_on = Column(DateTime, nullable=False)
>
>   @classmethod
> @provide_session
> def set(
>   cls,
>   dag_id,
>   task_id,
>   exclusion_type,
>   exclusion_start_date,
>   exclusion_end_date,
>   created_by,
>   session=None):
>   """
> Add a task exclusion to prevent a task running under certain
> circumstances.
> :param dag_id: The dag_id of the DAG containing the task to exclude
> from execution.
> :param task_id: The task_id of the task to exclude from execution.
> :param exclusion_type: The type of circumstances to exclude the task
> from execution under. See the TaskExclusionType class for more detail.
> :param exclusion_start_date: The execution_date to start excluding on.
> This will be ignored if the exclusion_type is INDEFINITE.
> :param exclusion_end_date: The execution_date to stop excluding on.
> This will be ignored if the ex

Re: Skip task

2016-12-12 Thread Bolke de Bruin
Have a look at: 
https://github.com/apache/incubator-airflow/blob/master/airflow/migrations/versions/4addfa1236f1_add_fractional_seconds_to_mysql_tables.py
 
<https://github.com/apache/incubator-airflow/blob/master/airflow/migrations/versions/4addfa1236f1_add_fractional_seconds_to_mysql_tables.py>

Make sure to include "type_=mysql.DATETIME(fsp=6)” for your DateTime types on 
MySQL.

- Bolke



> Op 12 dec. 2016, om 11:33 heeft Maycock, Luke 
>  het volgende geschreven:
> 
> It is a new table named 'TaskExclusion'. The migration script for this is as 
> follows:
> 
> def upgrade():
>op.create_table(
>'task_exclusion',
>sa.Column('id', sa.Integer(), nullable=False),
>sa.Column('dag_id', sa.String(length=250), nullable=False),
>sa.Column('task_id', sa.String(length=250), nullable=False),
>sa.Column('exclusion_type', sa.String(length=32), nullable=False),
>sa.Column('exclusion_start_date', sa.DateTime(), nullable=False),
>sa.Column('exclusion_end_date', sa.DateTime(), nullable=False),
>sa.Column('created_by', sa.String(length=256), nullable=False),
>sa.Column('created_on', sa.DateTime(), nullable=False),
>sa.PrimaryKeyConstraint('id'))
> 
> 
> def downgrade():
>op.drop_table('task_exclusion')
> 
> This is the PR for the exclusion of a task. We review our code internally 
> before setting up a PR into the main repo for the next review, hence the PR 
> being in our fork. The PR does not yet contain our unit tests.
> 
> 
> Cheers,
> Luke Maycock
> OLIVER WYMAN
> luke.mayc...@affiliate.oliverwyman.com<mailto:luke.mayc...@affiliate.oliverwyman.com>
> www.oliverwyman.com<http://www.oliverwyman.com/>
> 
> 
> 
> 
> From: Bolke de Bruin 
> Sent: 09 December 2016 20:54
> To: dev@airflow.incubator.apache.org
> Subject: Re: Skip task
> 
> What table was this? I recently pushed a fix that allows fractional seconds 
> in our minimum supported version of MySQL (5.6.4 and beyond).
> 
> I might have missed something.
> 
> Thanks
> Bolke
> 
> Sent from my iPhone
> 
>> On 9 Dec 2016, at 14:27, Maycock, Luke 
>>  wrote:
>> 
>> I found the issue to be that, for MySQL, the datetime was being rounded to 
>> the nearest second. The strange thing is that if a datetime without the 
>> microseconds was passed to SQLAlchemy, the insertion into MySQL failed; but 
>> when a datetime with microseconds was passed, the microseconds are removed 
>> by rounding to the nearest second.
>> 
>> 
>> Hopefully, this will prevent someone else going down the same rabbit hole 
>> that I did.
>> 
>> 
>> Cheers,
>> Luke Maycock
>> OLIVER WYMAN
>> luke.mayc...@affiliate.oliverwyman.com<mailto:luke.mayc...@affiliate.oliverwyman.com>
>> www.oliverwyman.com<http://www.oliverwyman.com/>
>> 
>> 
>> 
>> From: Maycock, Luke 
>> Sent: 08 December 2016 10:44:32
>> To: dev@airflow.incubator.apache.org
>> Subject: Re: Skip task
>> 
>> Hi All,
>> 
>> 
>> We have implemented a solution for allowing the exclusion of individual 
>> tasks during a DAG run. However, when writing unit tests for this, we are 
>> encountering an issue with MySQL, which I am hoping someone is able to help 
>> us with.
>> 
>> 
>> For our solution, we have a new 'TaskExclusion' table in the meta-data. Our 
>> unit tests were run by Travis, not locally.
>> 
>> 
>> The code block under test:
>> 
>> 
>> class TaskExclusion(Base):
>>  """
>> This class is used to define objects that can be used to specify not to
>> run a given task in a given dag on a variety of execution date conditions.
>> These objects will be stored in the backend database in the task_exclusion
>> table.
>> Static methods are provided for the creation, removal and investigation of
>> these objects.
>> """
>> 
>> __tablename__ = "task_exclusion"
>> 
>> id = Column(Integer(), primary_key=True)
>>  dag_id = Column(String(ID_LEN), nullable=False)
>>  task_id = Column(String(ID_LEN), nullable=False)
>>  exclusion_type = Column(String(32), nullable=False)
>>  exclusion_start_date = Column(DateTime, nullable=True)
>>  exclusion_end_date = Column(DateTime, nullable=True)
>>  created_by = Column(String(256), nullable=False)
>>  created_on = Column(DateTime, nu

Re: Skip task

2016-12-12 Thread Maycock, Luke
Excellent - thanks for your help Bolke, much appreciated!


Cheers,
Luke Maycock
OLIVER WYMAN
luke.mayc...@affiliate.oliverwyman.com<mailto:luke.mayc...@affiliate.oliverwyman.com>
www.oliverwyman.com<http://www.oliverwyman.com/>




From: Bolke de Bruin 
Sent: 12 December 2016 10:40
To: dev@airflow.incubator.apache.org
Subject: Re: Skip task

Have a look at: 
https://github.com/apache/incubator-airflow/blob/master/airflow/migrations/versions/4addfa1236f1_add_fractional_seconds_to_mysql_tables.py
 
<https://github.com/apache/incubator-airflow/blob/master/airflow/migrations/versions/4addfa1236f1_add_fractional_seconds_to_mysql_tables.py>

Make sure to include "type_=mysql.DATETIME(fsp=6)” for your DateTime types on 
MySQL.

- Bolke



> Op 12 dec. 2016, om 11:33 heeft Maycock, Luke 
>  het volgende geschreven:
>
> It is a new table named 'TaskExclusion'. The migration script for this is as 
> follows:
>
> def upgrade():
>op.create_table(
>'task_exclusion',
>sa.Column('id', sa.Integer(), nullable=False),
>sa.Column('dag_id', sa.String(length=250), nullable=False),
>sa.Column('task_id', sa.String(length=250), nullable=False),
>sa.Column('exclusion_type', sa.String(length=32), nullable=False),
>sa.Column('exclusion_start_date', sa.DateTime(), nullable=False),
>sa.Column('exclusion_end_date', sa.DateTime(), nullable=False),
>sa.Column('created_by', sa.String(length=256), nullable=False),
>sa.Column('created_on', sa.DateTime(), nullable=False),
>sa.PrimaryKeyConstraint('id'))
>
>
> def downgrade():
>op.drop_table('task_exclusion')
>
> This is the PR for the exclusion of a task. We review our code internally 
> before setting up a PR into the main repo for the next review, hence the PR 
> being in our fork. The PR does not yet contain our unit tests.
>
>
> Cheers,
> Luke Maycock
> OLIVER WYMAN
> luke.mayc...@affiliate.oliverwyman.com<mailto:luke.mayc...@affiliate.oliverwyman.com>
> www.oliverwyman.com<http://www.oliverwyman.com/>
>
>
>
> 
> From: Bolke de Bruin 
> Sent: 09 December 2016 20:54
> To: dev@airflow.incubator.apache.org
> Subject: Re: Skip task
>
> What table was this? I recently pushed a fix that allows fractional seconds 
> in our minimum supported version of MySQL (5.6.4 and beyond).
>
> I might have missed something.
>
> Thanks
> Bolke
>
> Sent from my iPhone
>
>> On 9 Dec 2016, at 14:27, Maycock, Luke 
>>  wrote:
>>
>> I found the issue to be that, for MySQL, the datetime was being rounded to 
>> the nearest second. The strange thing is that if a datetime without the 
>> microseconds was passed to SQLAlchemy, the insertion into MySQL failed; but 
>> when a datetime with microseconds was passed, the microseconds are removed 
>> by rounding to the nearest second.
>>
>>
>> Hopefully, this will prevent someone else going down the same rabbit hole 
>> that I did.
>>
>>
>> Cheers,
>> Luke Maycock
>> OLIVER WYMAN
>> luke.mayc...@affiliate.oliverwyman.com<mailto:luke.mayc...@affiliate.oliverwyman.com>
>> www.oliverwyman.com<http://www.oliverwyman.com/>
>>
>>
>> 
>> From: Maycock, Luke 
>> Sent: 08 December 2016 10:44:32
>> To: dev@airflow.incubator.apache.org
>> Subject: Re: Skip task
>>
>> Hi All,
>>
>>
>> We have implemented a solution for allowing the exclusion of individual 
>> tasks during a DAG run. However, when writing unit tests for this, we are 
>> encountering an issue with MySQL, which I am hoping someone is able to help 
>> us with.
>>
>>
>> For our solution, we have a new 'TaskExclusion' table in the meta-data. Our 
>> unit tests were run by Travis, not locally.
>>
>>
>> The code block under test:
>>
>>
>> class TaskExclusion(Base):
>>  """
>> This class is used to define objects that can be used to specify not to
>> run a given task in a given dag on a variety of execution date conditions.
>> These objects will be stored in the backend database in the task_exclusion
>> table.
>> Static methods are provided for the creation, removal and investigation of
>> these objects.
>> """
>>
>> __tablename__ = "task_exclusion"
>>
>> id = Column(Integer(), primary_key=True)
>>  dag_id = Column(String(ID_LEN), nullable=

Re: Skip task

2017-10-18 Thread Or Sher
I found this thread while searching for the exact same feature.
Was you PR ever merged?
Is there any other functionality that handles this in the current release?
(I'm still on 1.7)
I'd really hate using the branch and dummy operators for every task I might
want to skip.

On Mon, Dec 12, 2016 at 1:56 PM Maycock, Luke <
luke.mayc...@affiliate.oliverwyman.com> wrote:

> Excellent - thanks for your help Bolke, much appreciated!
>
>
> Cheers,
> Luke Maycock
> OLIVER WYMAN
> luke.mayc...@affiliate.oliverwyman.com luke.mayc...@affiliate.oliverwyman.com>
> www.oliverwyman.com<http://www.oliverwyman.com/>
>
>
>
> 
> From: Bolke de Bruin 
> Sent: 12 December 2016 10:40
> To: dev@airflow.incubator.apache.org
> Subject: Re: Skip task
>
> Have a look at:
> https://github.com/apache/incubator-airflow/blob/master/airflow/migrations/versions/4addfa1236f1_add_fractional_seconds_to_mysql_tables.py
> <
> https://github.com/apache/incubator-airflow/blob/master/airflow/migrations/versions/4addfa1236f1_add_fractional_seconds_to_mysql_tables.py
> >
>
> Make sure to include "type_=mysql.DATETIME(fsp=6)” for your DateTime types
> on MySQL.
>
> - Bolke
>
>
>
> > Op 12 dec. 2016, om 11:33 heeft Maycock, Luke <
> luke.mayc...@affiliate.oliverwyman.com> het volgende geschreven:
> >
> > It is a new table named 'TaskExclusion'. The migration script for this
> is as follows:
> >
> > def upgrade():
> >op.create_table(
> >'task_exclusion',
> >sa.Column('id', sa.Integer(), nullable=False),
> >sa.Column('dag_id', sa.String(length=250), nullable=False),
> >sa.Column('task_id', sa.String(length=250), nullable=False),
> >sa.Column('exclusion_type', sa.String(length=32), nullable=False),
> >sa.Column('exclusion_start_date', sa.DateTime(), nullable=False),
> >sa.Column('exclusion_end_date', sa.DateTime(), nullable=False),
> >sa.Column('created_by', sa.String(length=256), nullable=False),
> >sa.Column('created_on', sa.DateTime(), nullable=False),
> >sa.PrimaryKeyConstraint('id'))
> >
> >
> > def downgrade():
> >op.drop_table('task_exclusion')
> >
> > This is the PR for the exclusion of a task. We review our code
> internally before setting up a PR into the main repo for the next review,
> hence the PR being in our fork. The PR does not yet contain our unit tests.
> >
> >
> > Cheers,
> > Luke Maycock
> > OLIVER WYMAN
> > luke.mayc...@affiliate.oliverwyman.com luke.mayc...@affiliate.oliverwyman.com>
> > www.oliverwyman.com<http://www.oliverwyman.com/>
> >
> >
> >
> > 
> > From: Bolke de Bruin 
> > Sent: 09 December 2016 20:54
> > To: dev@airflow.incubator.apache.org
> > Subject: Re: Skip task
> >
> > What table was this? I recently pushed a fix that allows fractional
> seconds in our minimum supported version of MySQL (5.6.4 and beyond).
> >
> > I might have missed something.
> >
> > Thanks
> > Bolke
> >
> > Sent from my iPhone
> >
> >> On 9 Dec 2016, at 14:27, Maycock, Luke <
> luke.mayc...@affiliate.oliverwyman.com> wrote:
> >>
> >> I found the issue to be that, for MySQL, the datetime was being rounded
> to the nearest second. The strange thing is that if a datetime without the
> microseconds was passed to SQLAlchemy, the insertion into MySQL failed; but
> when a datetime with microseconds was passed, the microseconds are removed
> by rounding to the nearest second.
> >>
> >>
> >> Hopefully, this will prevent someone else going down the same rabbit
> hole that I did.
> >>
> >>
> >> Cheers,
> >> Luke Maycock
> >> OLIVER WYMAN
> >> luke.mayc...@affiliate.oliverwyman.com luke.mayc...@affiliate.oliverwyman.com>
> >> www.oliverwyman.com<http://www.oliverwyman.com/>
> >>
> >>
> >> 
> >> From: Maycock, Luke 
> >> Sent: 08 December 2016 10:44:32
> >> To: dev@airflow.incubator.apache.org
> >> Subject: Re: Skip task
> >>
> >> Hi All,
> >>
> >>
> >> We have implemented a solution for allowing the exclusion of individual
> tasks during a DAG run. However, when writing unit tests for this, we are
> encountering an issue with MySQL, which I am hoping someone i