AirflowTaskTimeout: Timeout: pipeline getting stalled

2017-03-22 Thread harish singh
Hi guys,

So I have airflow 1.8 running at my company now.  Overall, the performance
have improved and  scheduling has been faster.
The jobs are running and the pipeline do progress but I am running into few
issues. Please help if you have seen this before. Any help will be
appreciated.


1. Jobs getting scheduled -> queued but not  Running.
Read an email from Bolke where the suggestion was to increase the size of
Pools.
But this hasn't worked.
I manually cleared the tasks and saw airflow running them after clearing.


2. For the same above issue, I saw that there were Timeout errors seen:
I still havent able to understand why this happens.
This is the entire trace:

[2017-03-22 19:35:16,332] {models.py:167} INFO - Filling up the DagBag from
/usr/local/airflow/pipeline/pipeline.py [2017-03-22 19:35:22,451]
{airflow_configuration.py:40} INFO - loading setup.cfg file [2017-03-22
19:35:51,041] {timeout.py:37} ERROR - Process timed out [2017-03-22
19:35:51,041] {models.py:266} ERROR - Failed to import:
/usr/local/airflow/pipeline/pipeline.py Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 263,
in process_file m = imp.load_source(mod_name, filepath) File
"/usr/local/airflow/pipeline/pipeline.py", line 167, in 
create_tasks(dbguid, version, dag, override_start_date) File
"/usr/local/airflow/pipeline/pipeline.py", line 104, in create_tasks t =
create_task(dbguid, dag, taskInfo, version, override_date) File
"/usr/local/airflow/pipeline/pipeline.py", line 85, in create_task retries,
1, depends_on_past, version, override_dag_date) File
"/usr/local/airflow/pipeline/dags/base_pipeline.py", line 90, in
create_python_operator depends_on_past=depends_on_past) File
"/usr/local/lib/python2.7/dist-packages/airflow/utils/decorators.py", line
86, in wrapper result = func(*args, **kwargs) File
"/usr/local/lib/python2.7/dist-packages/airflow/operators/python_operator.py",
line 65, in __init__ super(PythonOperator, self).__init__(*args, **kwargs)
File "/usr/local/lib/python2.7/dist-packages/airflow/utils/decorators.py",
line 70, in wrapper sig = signature(func) File
"/usr/local/lib/python2.7/dist-packages/funcsigs/__init__.py", line 105, in
signature return Signature.from_function(obj) File
"/usr/local/lib/python2.7/dist-packages/funcsigs/__init__.py", line 594, in
from_function __validate_parameters__=False) File
"/usr/local/lib/python2.7/dist-packages/funcsigs/__init__.py", line 518, in
__init__ for param in parameters)) File
"/usr/lib/python2.7/collections.py", line 52, in __init__
self.__update(*args, **kwds) File "/usr/lib/python2.7/_abcoll.py", line
548, in update self[key] = value File "/usr/lib/python2.7/collections.py",
line 61, in __setitem__ last[1] = root[0] = self.__map[key] = [last, root,
key] File
"/usr/local/lib/python2.7/dist-packages/airflow/utils/timeout.py", line 38,
in handle_timeout raise AirflowTaskTimeout(self.error_message)
AirflowTaskTimeout: Timeout

3. "_cmd" doesnt work anymore for fetching sqlalchemy_connection.
Even when I am using mysql (connection url doesnt include 'sqlite'
anywhere):
"error: cannot use sqlite with the LocalExecutor"


Re: variable scope with dynamic dags

2017-03-22 Thread Boris Tyukin
thanks Jeremiah, this is exactly what was bugging me. I am going to rewrite
that code and look at persistent storage. your explanation helped, thanks!

On Wed, Mar 22, 2017 at 2:29 PM, Jeremiah Lowin  wrote:

> In vanilla Python, your DAGs will all reference the same object, so when
> your DAG file is parsed and 200 DAGs are created, there will still only be
> 1 60MB dict object created (I say vanilla because there are obviously ways
> to create copies of the object).
>
> HOWEVER, you should assume that each Airflow TASK is being run in a
> different process, and each process is going to load your DAG file when it
> runs. If resource use is a concern, I suggest you look at out-of-core or
> persistent storage for the object so you don't need to load the whole thing
> every time.
>
> On Wed, Mar 22, 2017 at 11:20 AM Boris Tyukin 
> wrote:
>
> > hi Jeremiah, thanks for the explanation!
> >
> > i am very new to Python so was surprised that it works and my external
> > dictionary object was still accessible to all dags generated. I think it
> > makes sense but I would like to confirm one thing and I do not know how
> to
> > test it myself.
> >
> > do you think that large dictionary object will still be loaded to memory
> > only once even if I generate 200 dags that will be accessing it? so
> > basically they will just use a reference to it or they would create a
> copy
> > of the same 60Mb structure.
> >
> > I hope my question makes sense :)
> >
> > On Wed, Mar 22, 2017 at 10:54 AM, Jeremiah Lowin 
> > wrote:
> >
> > > At the risk of oversimplifying things, your DAG definition file is
> loaded
> > > *every* time a DAG (or any task in that DAG) is run. Think of it as a
> > > literal Python import of your dag-defining module: any variables are
> > loaded
> > > along with the DAGs, which are then executed. That's why your dict is
> > > always available. This will work with Celery since it follows the same
> > > approach, parsing your DAG file to run each task.
> > >
> > > (By the way, this is why it's critical that all parts of your Airflow
> > > infrastructure have access to the same DAGS_FOLDER)
> > >
> > > Now it is true that the DagBag loads DAG objects but think of it as
> more
> > of
> > > an "index" so that the scheduler/webserver know what DAGs are
> available.
> > > When it's time to actually run one of those DAGs, the executor loads it
> > > from the underlying source file.
> > >
> > > Jeremiah
> > >
> > > On Wed, Mar 22, 2017 at 8:45 AM Boris Tyukin 
> > > wrote:
> > >
> > > > Hi,
> > > >
> > > > I have a weird question but it bugs my mind. I have some like below
> to
> > > > generate dags dynamically, using Max's example code from FAQ.
> > > >
> > > > It works fine but I have one large dict (let's call it my_outer_dict)
> > > that
> > > > takes over 60Mb in memory and I need to access it from all generated
> > > dags.
> > > > Needless to say, i do not want to recreate that dict for every dag
> as I
> > > > want to load it to memory only once.
> > > >
> > > > To my surprise, if i define that dag outside of my dag definition
> > code, I
> > > > can still access it.
> > > >
> > > > Can someone explain why and where is it stored? I thought only dag
> > > > definitions are loaded to dagbag and not the variables outside it.
> > > >
> > > > Is it even a good practice and will it work still if I switch to
> celery
> > > > executor?
> > > >
> > > >
> > > > def get_dag(i):
> > > > dag_id = 'foo_{}'.format(i)
> > > > dag = DAG(dag_id)
> > > > 
> > > > print my_outer_dict
> > > >
> > > > my_outer_dict = {}
> > > > for i in range(10):
> > > > dag = get_dag(i)
> > > > globals()[dag.dag_id] = dag
> > > >
> > >
> >
>


Re: variable scope with dynamic dags

2017-03-22 Thread Jeremiah Lowin
In vanilla Python, your DAGs will all reference the same object, so when
your DAG file is parsed and 200 DAGs are created, there will still only be
1 60MB dict object created (I say vanilla because there are obviously ways
to create copies of the object).

HOWEVER, you should assume that each Airflow TASK is being run in a
different process, and each process is going to load your DAG file when it
runs. If resource use is a concern, I suggest you look at out-of-core or
persistent storage for the object so you don't need to load the whole thing
every time.

On Wed, Mar 22, 2017 at 11:20 AM Boris Tyukin  wrote:

> hi Jeremiah, thanks for the explanation!
>
> i am very new to Python so was surprised that it works and my external
> dictionary object was still accessible to all dags generated. I think it
> makes sense but I would like to confirm one thing and I do not know how to
> test it myself.
>
> do you think that large dictionary object will still be loaded to memory
> only once even if I generate 200 dags that will be accessing it? so
> basically they will just use a reference to it or they would create a copy
> of the same 60Mb structure.
>
> I hope my question makes sense :)
>
> On Wed, Mar 22, 2017 at 10:54 AM, Jeremiah Lowin 
> wrote:
>
> > At the risk of oversimplifying things, your DAG definition file is loaded
> > *every* time a DAG (or any task in that DAG) is run. Think of it as a
> > literal Python import of your dag-defining module: any variables are
> loaded
> > along with the DAGs, which are then executed. That's why your dict is
> > always available. This will work with Celery since it follows the same
> > approach, parsing your DAG file to run each task.
> >
> > (By the way, this is why it's critical that all parts of your Airflow
> > infrastructure have access to the same DAGS_FOLDER)
> >
> > Now it is true that the DagBag loads DAG objects but think of it as more
> of
> > an "index" so that the scheduler/webserver know what DAGs are available.
> > When it's time to actually run one of those DAGs, the executor loads it
> > from the underlying source file.
> >
> > Jeremiah
> >
> > On Wed, Mar 22, 2017 at 8:45 AM Boris Tyukin 
> > wrote:
> >
> > > Hi,
> > >
> > > I have a weird question but it bugs my mind. I have some like below to
> > > generate dags dynamically, using Max's example code from FAQ.
> > >
> > > It works fine but I have one large dict (let's call it my_outer_dict)
> > that
> > > takes over 60Mb in memory and I need to access it from all generated
> > dags.
> > > Needless to say, i do not want to recreate that dict for every dag as I
> > > want to load it to memory only once.
> > >
> > > To my surprise, if i define that dag outside of my dag definition
> code, I
> > > can still access it.
> > >
> > > Can someone explain why and where is it stored? I thought only dag
> > > definitions are loaded to dagbag and not the variables outside it.
> > >
> > > Is it even a good practice and will it work still if I switch to celery
> > > executor?
> > >
> > >
> > > def get_dag(i):
> > > dag_id = 'foo_{}'.format(i)
> > > dag = DAG(dag_id)
> > > 
> > > print my_outer_dict
> > >
> > > my_outer_dict = {}
> > > for i in range(10):
> > > dag = get_dag(i)
> > > globals()[dag.dag_id] = dag
> > >
> >
>


Re: Airflow Newbie Question

2017-03-22 Thread Jorge Alpedrinha Ramos
Defining the dag inside the for loop would be the best way IMO, if you
define a subdag you would have the same problem.

Subdags seem to be more suited for repeating some patters without rewriting
the code.


On Wed, Mar 22, 2017 at 4:49 PM Andrew Maguire 
wrote:

> Yeah was thinking that afterwards. Would that be a sub dag approach, or is
> it even as  easy as moving the dag definition itself into the for loop?
>
> Cheers
> Andy
>
> On Wed, Mar 22, 2017 at 4:45 PM Jorge Alpedrinha Ramos <
> jalpedrinhara...@gmail.com> wrote:
>
> > Hey!
> >
> > I've read your post and I would point that you could instead of creating
> > tasks on one dag for each lob, you could create one dag per each lob
> with a
> > 'my_bigquery_dag_for_{{lob}}'. This way you get a more isolated unit of
> > management (dag). The advantage here being that in the event of a failure
> > or data reprocess for a single lob you wouldn't need to reprocess all
> lobs.
> >
> > Other than that it is really cool that you're putting it out on a blog
> > post, it will definitely be useful.
> >
> > Cheers !
> >
> > On Wed, Mar 22, 2017 at 4:20 PM Andrew Maguire 
> > wrote:
> >
> > > Hi All,
> > >
> > > Just to follow up as fyi.
> > >
> > > Think i figured out at least one approach. Did a blog post on it
> > >
> > >
> >
> http://engineering.pmc.com/2017/03/playing-around-with-apache-airflow-bigquery-62/
> > >
> > > Can't wait to get going on porting stuff over from cron.
> > >
> > > Cheers,
> > > Andy
> > >
> > > On Tue, Mar 21, 2017 at 2:49 PM Andrew Maguire 
> > > wrote:
> > >
> > > > Sorry if this is not the right place for this.
> > > >
> > > > I'm wondering if anyone might be able to help me with this question?
> > > >
> > > >
> > > >
> > >
> >
> http://stackoverflow.com/questions/42930221/best-way-to-loop-through-parameters-in-airflow
> > > >
> > > > Feel free to ignore if this is not the right place to try find help.
> > > >
> > > > Loving airflow btw!
> > > >
> > > > Cheers,
> > > > Andy
> > > >
> > >
> >
>


Re: Airflow Newbie Question

2017-03-22 Thread Andrew Maguire
Yeah was thinking that afterwards. Would that be a sub dag approach, or is
it even as  easy as moving the dag definition itself into the for loop?

Cheers
Andy

On Wed, Mar 22, 2017 at 4:45 PM Jorge Alpedrinha Ramos <
jalpedrinhara...@gmail.com> wrote:

> Hey!
>
> I've read your post and I would point that you could instead of creating
> tasks on one dag for each lob, you could create one dag per each lob with a
> 'my_bigquery_dag_for_{{lob}}'. This way you get a more isolated unit of
> management (dag). The advantage here being that in the event of a failure
> or data reprocess for a single lob you wouldn't need to reprocess all lobs.
>
> Other than that it is really cool that you're putting it out on a blog
> post, it will definitely be useful.
>
> Cheers !
>
> On Wed, Mar 22, 2017 at 4:20 PM Andrew Maguire 
> wrote:
>
> > Hi All,
> >
> > Just to follow up as fyi.
> >
> > Think i figured out at least one approach. Did a blog post on it
> >
> >
> http://engineering.pmc.com/2017/03/playing-around-with-apache-airflow-bigquery-62/
> >
> > Can't wait to get going on porting stuff over from cron.
> >
> > Cheers,
> > Andy
> >
> > On Tue, Mar 21, 2017 at 2:49 PM Andrew Maguire 
> > wrote:
> >
> > > Sorry if this is not the right place for this.
> > >
> > > I'm wondering if anyone might be able to help me with this question?
> > >
> > >
> > >
> >
> http://stackoverflow.com/questions/42930221/best-way-to-loop-through-parameters-in-airflow
> > >
> > > Feel free to ignore if this is not the right place to try find help.
> > >
> > > Loving airflow btw!
> > >
> > > Cheers,
> > > Andy
> > >
> >
>


Re: Airflow Newbie Question

2017-03-22 Thread Jorge Alpedrinha Ramos
Hey!

I've read your post and I would point that you could instead of creating
tasks on one dag for each lob, you could create one dag per each lob with a
'my_bigquery_dag_for_{{lob}}'. This way you get a more isolated unit of
management (dag). The advantage here being that in the event of a failure
or data reprocess for a single lob you wouldn't need to reprocess all lobs.

Other than that it is really cool that you're putting it out on a blog
post, it will definitely be useful.

Cheers !

On Wed, Mar 22, 2017 at 4:20 PM Andrew Maguire 
wrote:

> Hi All,
>
> Just to follow up as fyi.
>
> Think i figured out at least one approach. Did a blog post on it
>
> http://engineering.pmc.com/2017/03/playing-around-with-apache-airflow-bigquery-62/
>
> Can't wait to get going on porting stuff over from cron.
>
> Cheers,
> Andy
>
> On Tue, Mar 21, 2017 at 2:49 PM Andrew Maguire 
> wrote:
>
> > Sorry if this is not the right place for this.
> >
> > I'm wondering if anyone might be able to help me with this question?
> >
> >
> >
> http://stackoverflow.com/questions/42930221/best-way-to-loop-through-parameters-in-airflow
> >
> > Feel free to ignore if this is not the right place to try find help.
> >
> > Loving airflow btw!
> >
> > Cheers,
> > Andy
> >
>


Re: Airflow Newbie Question

2017-03-22 Thread Andrew Maguire
Hi All,

Just to follow up as fyi.

Think i figured out at least one approach. Did a blog post on it
http://engineering.pmc.com/2017/03/playing-around-with-apache-airflow-bigquery-62/

Can't wait to get going on porting stuff over from cron.

Cheers,
Andy

On Tue, Mar 21, 2017 at 2:49 PM Andrew Maguire 
wrote:

> Sorry if this is not the right place for this.
>
> I'm wondering if anyone might be able to help me with this question?
>
>
> http://stackoverflow.com/questions/42930221/best-way-to-loop-through-parameters-in-airflow
>
> Feel free to ignore if this is not the right place to try find help.
>
> Loving airflow btw!
>
> Cheers,
> Andy
>


Re: 1.8.1 release

2017-03-22 Thread Ruslan Dautkhanov
Thank you Sid!


Best regards,
Ruslan

On Wed, Mar 22, 2017 at 12:01 AM, siddharth anand  wrote:

> Ruslan,
> Thanks for sharing this list. I can pick a few up. I agree we should aim to
> get some of them into 1.8.1.
>
> -s
>
> On Tue, Mar 21, 2017 at 2:29 PM, Ruslan Dautkhanov 
> wrote:
>
> > Some of the issues I ran into while testing 1.8rc5 :
> >
> > https://issues.apache.org/jira/browse/AIRFLOW-1015
> > > https://issues.apache.org/jira/browse/AIRFLOW-1013
> > > https://issues.apache.org/jira/browse/AIRFLOW-1004
> > > https://issues.apache.org/jira/browse/AIRFLOW-1003
> > > https://issues.apache.org/jira/browse/AIRFLOW-1001
> > > https://issues.apache.org/jira/browse/AIRFLOW-1015
> >
> >
> > It would be great to have at least some of them fixed in 1.8.1.
> >
> > Thank you.
> >
> >
> >
> >
> > --
> > Ruslan Dautkhanov
> >
> > On Tue, Mar 21, 2017 at 3:02 PM, Dan Davydov  > invalid
> > > wrote:
> >
> > > Here is my list for targeted 1.8.1 fixes:
> > > https://issues.apache.org/jira/browse/AIRFLOW-982
> > > https://issues.apache.org/jira/browse/AIRFLOW-983
> > > https://issues.apache.org/jira/browse/AIRFLOW-1019 (and in general the
> > > slow
> > > startup time from this new logic of orphaned/reset task)
> > > https://issues.apache.org/jira/browse/AIRFLOW-1017 (which I will
> > hopefully
> > > have a fix out for soon just finishing up tests)
> > >
> > > We are also hitting a new issue with subdags with rc5 that we weren't
> > > hitting with rc4 where subdags will occasionally just hang (had to roll
> > > back from rc5 to rc4), I'll try to spin up a JIRA for it soon which
> > should
> > > be on the list too.
> > >
> > >
> > > On Tue, Mar 21, 2017 at 1:54 PM, Chris Riccomini <
> criccom...@apache.org>
> > > wrote:
> > >
> > > > Agreed. I'm looking for a list of checksums/JIRAs that we want in the
> > > > bugfix release.
> > > >
> > > > On Tue, Mar 21, 2017 at 12:54 PM, Bolke de Bruin 
> > > > wrote:
> > > >
> > > > >
> > > > >
> > > > > > On 21 Mar 2017, at 12:51, Bolke de Bruin 
> > wrote:
> > > > > >
> > > > > > My suggestion, as we are using semantic versioning is:
> > > > > >
> > > > > > 1) no new features in the 1.8 branch
> > > > > > 2) only bug fixes in the 1.8 branch
> > > > > > 3) new features to land in 1.9
> > > > > >
> > > > > > This allows companies to
> > > > >
> > > > > Have a "known" version and can move to the new branch when they
> want
> > to
> > > > > get new features. Obviously we only support N-1, so when 1.10 comes
> > out
> > > > we
> > > > > stop supporting 1.8.X.
> > > > >
> > > > > >
> > > > > > Sent from my iPhone
> > > > > >
> > > > > >> On 21 Mar 2017, at 11:22, Chris Riccomini <
> criccom...@apache.org>
> > > > > wrote:
> > > > > >>
> > > > > >> Hey all,
> > > > > >>
> > > > > >> I suggest that we start a 1.8.1 Airflow release now. The goal
> > would
> > > > be:
> > > > > >>
> > > > > >> 1) get a second release under our belt
> > > > > >> 2) patch known issues with the 1.8.0 release
> > > > > >>
> > > > > >> I'm happy to run it, but I saw Maxime mentioning that Airbnb
> might
> > > > want
> > > > > to.
> > > > > >> @Max et al, can you comment?
> > > > > >>
> > > > > >> Also, can folks supply JIRAs for stuff that think needs to be in
> > the
> > > > > 1.8.1
> > > > > >> bugfix release?
> > > > > >>
> > > > > >> Cheers,
> > > > > >> Chris
> > > > >
> > > >
> > >
> >
>


Re: Reminder : LatestOnlyOperator

2017-03-22 Thread Sumit Maheshwari
Vincent,

I think "dag.catchup = False" affects the whole DAG, means skipping all
tasks in it. While "LatestOnlyOperator" can be used to skip only some of
the tasks in a DAG as well.



On Wed, Mar 22, 2017 at 7:05 PM, Vincent Poulain <
vincent.poul...@tinyclues.com> wrote:

> I did not see the clear explanation there :
> http://airflow.incubator.apache.org/concepts.html?
> highlight=provide_context#latest-run-only
>
> All good!
>
> On Wed, Mar 22, 2017 at 2:22 PM, Vincent Poulain <
> vincent.poul...@tinyclues.com> wrote:
>
> > Sid, in your example what is the difference between using the
> > LatestOnlyOperator & set catch_up feature to False ? "[The catch up
> > feature] kick off a DAG Run for any interval that has not been run"
> > I am still learning Airflow concepts too..
> >
> > Thanks!
> >
> > On Tue, Mar 21, 2017 at 10:31 PM, Ruslan Dautkhanov <
> dautkha...@gmail.com>
> > wrote:
> >
> >> Thank you for the detailed explanation Boris.
> >>
> >>
> >> Best regards,
> >>
> >> Ruslan Dautkhanov
> >>
> >> On Mon, Mar 20, 2017 at 12:12 PM, Boris Tyukin 
> >> wrote:
> >>
> >> > depends_on_past is looking at previous task instance which sounds the
> >> same
> >> > as "latestonly" but the difference becomes apparent if you look at
> this
> >> > example.
> >> >
> >> > Let's say you have a dag, scheduled to run every day and it has been
> >> > failing for the past 3 days. The whole purpose of that dag is to
> >> populate
> >> > snapshot table or do a daily backup.  If you use depends on past, you
> >> would
> >> > have to rerun all missed runs or mark them as successful eventually
> >> doing
> >> > useless work (3 daily snapshots or backups for the same data).
> >> >
> >> > LatestOnly allows you to bypass missed runs and just do it once for
> most
> >> > recent instance.
> >> >
> >> > Another difference, depends on past is tricky if you use
> BranchOperator
> >> > because some branches may not run one day and run another - it will
> >> really
> >> > mess up your logic.
> >> >
> >> > On Mon, Mar 20, 2017 at 12:45 PM, Ruslan Dautkhanov <
> >> dautkha...@gmail.com>
> >> > wrote:
> >> >
> >> > > Thanks Boris. It does make sense.
> >> > > Although how it's different from depends_on_past task-level
> parameter?
> >> > > In both cases, a task will be skipped if there is another TI of this
> >> task
> >> > > is still running (from a previous dagrun), right?
> >> > >
> >> > >
> >> > > Thanks,
> >> > > Ruslan
> >> > >
> >> > >
> >> > > On Sat, Mar 18, 2017 at 7:11 PM, Boris Tyukin <
> bo...@boristyukin.com>
> >> > > wrote:
> >> > >
> >> > > > you would just chain them - there is an example that came with
> >> airflow
> >> > > 1.8
> >> > > > https://github.com/apache/incubator-airflow/blob/master/
> >> > > > airflow/example_dags/example_latest_only.py
> >> > > >
> >> > > > so in your case, instead of dummy operator, you would use your
> >> Oracle
> >> > > > operator.
> >> > > >
> >> > > > Does it make sense?
> >> > > >
> >> > > >
> >> > > > On Sat, Mar 18, 2017 at 7:12 PM, Ruslan Dautkhanov <
> >> > dautkha...@gmail.com
> >> > > >
> >> > > > wrote:
> >> > > >
> >> > > > > Is there is a way to combine scheduling behavior operators
> (like
> >> > this
> >> > > > > LatestOnlyOperator)
> >> > > > > with a functional operator (like Oracle_Operator)? I was
> thinking
> >> > > > multiple
> >> > > > > inheritance would do,like
> >> > > > >
> >> > > > > > class Oracle_LatestOnly_Operator (Oracle_Operator,
> >> > > LatestOnlyOperator):
> >> > > > > > ...
> >> > > > >
> >> > > > > I might be overthinking this and there could be a simpler way?
> >> > > > > Sorry, I am still learning Airflow concepts...
> >> > > > >
> >> > > > > Thanks.
> >> > > > >
> >> > > > >
> >> > > > >
> >> > > > > --
> >> > > > > Ruslan Dautkhanov
> >> > > > >
> >> > > > > On Sat, Mar 18, 2017 at 2:15 PM, Boris Tyukin <
> >> bo...@boristyukin.com
> >> > >
> >> > > > > wrote:
> >> > > > >
> >> > > > > > Thanks George for that feature!
> >> > > > > >
> >> > > > > > sure, just created a jira on this
> >> > > > > > https://issues.apache.org/jira/browse/AIRFLOW-1008
> >> > > > > >
> >> > > > > >
> >> > > > > > On Sat, Mar 18, 2017 at 12:05 PM, siddharth anand <
> >> > san...@apache.org
> >> > > >
> >> > > > > > wrote:
> >> > > > > >
> >> > > > > > > Thx Boris . Credit goes to George (gwax) for the
> >> implementation
> >> > of
> >> > > > the
> >> > > > > > > LatestOnlyOperator.
> >> > > > > > >
> >> > > > > > > Boris,
> >> > > > > > > Can you describe what you mean in a Jira?
> >> > > > > > > -s
> >> > > > > > >
> >> > > > > > > On Fri, Mar 17, 2017 at 6:02 PM, Boris Tyukin <
> >> > > bo...@boristyukin.com
> >> > > > >
> >> > > > > > > wrote:
> >> > > > > > >
> >> > > > > > > > this is nice indeed along with the new catchup option
> >> > > > > > > > https://airflow.incubator.apache.org/scheduler.html#
> >> > > > > > backfill-and-catchup
> >> > > > > > > >
> >> > > > > > > > Thanks Sid and Ben for adding these new options!
> >> > > > > > > >
> 

Re: variable scope with dynamic dags

2017-03-22 Thread Boris Tyukin
hi Jeremiah, thanks for the explanation!

i am very new to Python so was surprised that it works and my external
dictionary object was still accessible to all dags generated. I think it
makes sense but I would like to confirm one thing and I do not know how to
test it myself.

do you think that large dictionary object will still be loaded to memory
only once even if I generate 200 dags that will be accessing it? so
basically they will just use a reference to it or they would create a copy
of the same 60Mb structure.

I hope my question makes sense :)

On Wed, Mar 22, 2017 at 10:54 AM, Jeremiah Lowin  wrote:

> At the risk of oversimplifying things, your DAG definition file is loaded
> *every* time a DAG (or any task in that DAG) is run. Think of it as a
> literal Python import of your dag-defining module: any variables are loaded
> along with the DAGs, which are then executed. That's why your dict is
> always available. This will work with Celery since it follows the same
> approach, parsing your DAG file to run each task.
>
> (By the way, this is why it's critical that all parts of your Airflow
> infrastructure have access to the same DAGS_FOLDER)
>
> Now it is true that the DagBag loads DAG objects but think of it as more of
> an "index" so that the scheduler/webserver know what DAGs are available.
> When it's time to actually run one of those DAGs, the executor loads it
> from the underlying source file.
>
> Jeremiah
>
> On Wed, Mar 22, 2017 at 8:45 AM Boris Tyukin 
> wrote:
>
> > Hi,
> >
> > I have a weird question but it bugs my mind. I have some like below to
> > generate dags dynamically, using Max's example code from FAQ.
> >
> > It works fine but I have one large dict (let's call it my_outer_dict)
> that
> > takes over 60Mb in memory and I need to access it from all generated
> dags.
> > Needless to say, i do not want to recreate that dict for every dag as I
> > want to load it to memory only once.
> >
> > To my surprise, if i define that dag outside of my dag definition code, I
> > can still access it.
> >
> > Can someone explain why and where is it stored? I thought only dag
> > definitions are loaded to dagbag and not the variables outside it.
> >
> > Is it even a good practice and will it work still if I switch to celery
> > executor?
> >
> >
> > def get_dag(i):
> > dag_id = 'foo_{}'.format(i)
> > dag = DAG(dag_id)
> > 
> > print my_outer_dict
> >
> > my_outer_dict = {}
> > for i in range(10):
> > dag = get_dag(i)
> > globals()[dag.dag_id] = dag
> >
>


Re: variable scope with dynamic dags

2017-03-22 Thread Jeremiah Lowin
At the risk of oversimplifying things, your DAG definition file is loaded
*every* time a DAG (or any task in that DAG) is run. Think of it as a
literal Python import of your dag-defining module: any variables are loaded
along with the DAGs, which are then executed. That's why your dict is
always available. This will work with Celery since it follows the same
approach, parsing your DAG file to run each task.

(By the way, this is why it's critical that all parts of your Airflow
infrastructure have access to the same DAGS_FOLDER)

Now it is true that the DagBag loads DAG objects but think of it as more of
an "index" so that the scheduler/webserver know what DAGs are available.
When it's time to actually run one of those DAGs, the executor loads it
from the underlying source file.

Jeremiah

On Wed, Mar 22, 2017 at 8:45 AM Boris Tyukin  wrote:

> Hi,
>
> I have a weird question but it bugs my mind. I have some like below to
> generate dags dynamically, using Max's example code from FAQ.
>
> It works fine but I have one large dict (let's call it my_outer_dict) that
> takes over 60Mb in memory and I need to access it from all generated dags.
> Needless to say, i do not want to recreate that dict for every dag as I
> want to load it to memory only once.
>
> To my surprise, if i define that dag outside of my dag definition code, I
> can still access it.
>
> Can someone explain why and where is it stored? I thought only dag
> definitions are loaded to dagbag and not the variables outside it.
>
> Is it even a good practice and will it work still if I switch to celery
> executor?
>
>
> def get_dag(i):
> dag_id = 'foo_{}'.format(i)
> dag = DAG(dag_id)
> 
> print my_outer_dict
>
> my_outer_dict = {}
> for i in range(10):
> dag = get_dag(i)
> globals()[dag.dag_id] = dag
>


Re: Reminder : LatestOnlyOperator

2017-03-22 Thread Vincent Poulain
I did not see the clear explanation there :
http://airflow.incubator.apache.org/concepts.html?highlight=provide_context#latest-run-only

All good!

On Wed, Mar 22, 2017 at 2:22 PM, Vincent Poulain <
vincent.poul...@tinyclues.com> wrote:

> Sid, in your example what is the difference between using the
> LatestOnlyOperator & set catch_up feature to False ? "[The catch up
> feature] kick off a DAG Run for any interval that has not been run"
> I am still learning Airflow concepts too..
>
> Thanks!
>
> On Tue, Mar 21, 2017 at 10:31 PM, Ruslan Dautkhanov 
> wrote:
>
>> Thank you for the detailed explanation Boris.
>>
>>
>> Best regards,
>>
>> Ruslan Dautkhanov
>>
>> On Mon, Mar 20, 2017 at 12:12 PM, Boris Tyukin 
>> wrote:
>>
>> > depends_on_past is looking at previous task instance which sounds the
>> same
>> > as "latestonly" but the difference becomes apparent if you look at this
>> > example.
>> >
>> > Let's say you have a dag, scheduled to run every day and it has been
>> > failing for the past 3 days. The whole purpose of that dag is to
>> populate
>> > snapshot table or do a daily backup.  If you use depends on past, you
>> would
>> > have to rerun all missed runs or mark them as successful eventually
>> doing
>> > useless work (3 daily snapshots or backups for the same data).
>> >
>> > LatestOnly allows you to bypass missed runs and just do it once for most
>> > recent instance.
>> >
>> > Another difference, depends on past is tricky if you use BranchOperator
>> > because some branches may not run one day and run another - it will
>> really
>> > mess up your logic.
>> >
>> > On Mon, Mar 20, 2017 at 12:45 PM, Ruslan Dautkhanov <
>> dautkha...@gmail.com>
>> > wrote:
>> >
>> > > Thanks Boris. It does make sense.
>> > > Although how it's different from depends_on_past task-level parameter?
>> > > In both cases, a task will be skipped if there is another TI of this
>> task
>> > > is still running (from a previous dagrun), right?
>> > >
>> > >
>> > > Thanks,
>> > > Ruslan
>> > >
>> > >
>> > > On Sat, Mar 18, 2017 at 7:11 PM, Boris Tyukin 
>> > > wrote:
>> > >
>> > > > you would just chain them - there is an example that came with
>> airflow
>> > > 1.8
>> > > > https://github.com/apache/incubator-airflow/blob/master/
>> > > > airflow/example_dags/example_latest_only.py
>> > > >
>> > > > so in your case, instead of dummy operator, you would use your
>> Oracle
>> > > > operator.
>> > > >
>> > > > Does it make sense?
>> > > >
>> > > >
>> > > > On Sat, Mar 18, 2017 at 7:12 PM, Ruslan Dautkhanov <
>> > dautkha...@gmail.com
>> > > >
>> > > > wrote:
>> > > >
>> > > > > Is there is a way to combine scheduling behavior operators  (like
>> > this
>> > > > > LatestOnlyOperator)
>> > > > > with a functional operator (like Oracle_Operator)? I was thinking
>> > > > multiple
>> > > > > inheritance would do,like
>> > > > >
>> > > > > > class Oracle_LatestOnly_Operator (Oracle_Operator,
>> > > LatestOnlyOperator):
>> > > > > > ...
>> > > > >
>> > > > > I might be overthinking this and there could be a simpler way?
>> > > > > Sorry, I am still learning Airflow concepts...
>> > > > >
>> > > > > Thanks.
>> > > > >
>> > > > >
>> > > > >
>> > > > > --
>> > > > > Ruslan Dautkhanov
>> > > > >
>> > > > > On Sat, Mar 18, 2017 at 2:15 PM, Boris Tyukin <
>> bo...@boristyukin.com
>> > >
>> > > > > wrote:
>> > > > >
>> > > > > > Thanks George for that feature!
>> > > > > >
>> > > > > > sure, just created a jira on this
>> > > > > > https://issues.apache.org/jira/browse/AIRFLOW-1008
>> > > > > >
>> > > > > >
>> > > > > > On Sat, Mar 18, 2017 at 12:05 PM, siddharth anand <
>> > san...@apache.org
>> > > >
>> > > > > > wrote:
>> > > > > >
>> > > > > > > Thx Boris . Credit goes to George (gwax) for the
>> implementation
>> > of
>> > > > the
>> > > > > > > LatestOnlyOperator.
>> > > > > > >
>> > > > > > > Boris,
>> > > > > > > Can you describe what you mean in a Jira?
>> > > > > > > -s
>> > > > > > >
>> > > > > > > On Fri, Mar 17, 2017 at 6:02 PM, Boris Tyukin <
>> > > bo...@boristyukin.com
>> > > > >
>> > > > > > > wrote:
>> > > > > > >
>> > > > > > > > this is nice indeed along with the new catchup option
>> > > > > > > > https://airflow.incubator.apache.org/scheduler.html#
>> > > > > > backfill-and-catchup
>> > > > > > > >
>> > > > > > > > Thanks Sid and Ben for adding these new options!
>> > > > > > > >
>> > > > > > > > for a complete picture, it would be nice to force only one
>> dag
>> > > run
>> > > > at
>> > > > > > the
>> > > > > > > > time.
>> > > > > > > >
>> > > > > > > > On Fri, Mar 17, 2017 at 7:33 PM, siddharth anand <
>> > > > san...@apache.org>
>> > > > > > > > wrote:
>> > > > > > > >
>> > > > > > > > > With the Apache Airflow 1.8 release imminent, you may
>> want to
>> > > try
>> > > > > out
>> > > > > > > the
>> > > > > > > > >
>> > > > > > > > > *LatestOnlyOperator.*
>> > > > > > > > >
>> > > > > > > > > If you want your DAG to only run on the most recent
>> 

Re: Reminder : LatestOnlyOperator

2017-03-22 Thread Vincent Poulain
Sid, in your example what is the difference between using the
LatestOnlyOperator & set catch_up feature to False ? "[The catch up
feature] kick off a DAG Run for any interval that has not been run"
I am still learning Airflow concepts too..

Thanks!

On Tue, Mar 21, 2017 at 10:31 PM, Ruslan Dautkhanov 
wrote:

> Thank you for the detailed explanation Boris.
>
>
> Best regards,
>
> Ruslan Dautkhanov
>
> On Mon, Mar 20, 2017 at 12:12 PM, Boris Tyukin 
> wrote:
>
> > depends_on_past is looking at previous task instance which sounds the
> same
> > as "latestonly" but the difference becomes apparent if you look at this
> > example.
> >
> > Let's say you have a dag, scheduled to run every day and it has been
> > failing for the past 3 days. The whole purpose of that dag is to populate
> > snapshot table or do a daily backup.  If you use depends on past, you
> would
> > have to rerun all missed runs or mark them as successful eventually doing
> > useless work (3 daily snapshots or backups for the same data).
> >
> > LatestOnly allows you to bypass missed runs and just do it once for most
> > recent instance.
> >
> > Another difference, depends on past is tricky if you use BranchOperator
> > because some branches may not run one day and run another - it will
> really
> > mess up your logic.
> >
> > On Mon, Mar 20, 2017 at 12:45 PM, Ruslan Dautkhanov <
> dautkha...@gmail.com>
> > wrote:
> >
> > > Thanks Boris. It does make sense.
> > > Although how it's different from depends_on_past task-level parameter?
> > > In both cases, a task will be skipped if there is another TI of this
> task
> > > is still running (from a previous dagrun), right?
> > >
> > >
> > > Thanks,
> > > Ruslan
> > >
> > >
> > > On Sat, Mar 18, 2017 at 7:11 PM, Boris Tyukin 
> > > wrote:
> > >
> > > > you would just chain them - there is an example that came with
> airflow
> > > 1.8
> > > > https://github.com/apache/incubator-airflow/blob/master/
> > > > airflow/example_dags/example_latest_only.py
> > > >
> > > > so in your case, instead of dummy operator, you would use your Oracle
> > > > operator.
> > > >
> > > > Does it make sense?
> > > >
> > > >
> > > > On Sat, Mar 18, 2017 at 7:12 PM, Ruslan Dautkhanov <
> > dautkha...@gmail.com
> > > >
> > > > wrote:
> > > >
> > > > > Is there is a way to combine scheduling behavior operators  (like
> > this
> > > > > LatestOnlyOperator)
> > > > > with a functional operator (like Oracle_Operator)? I was thinking
> > > > multiple
> > > > > inheritance would do,like
> > > > >
> > > > > > class Oracle_LatestOnly_Operator (Oracle_Operator,
> > > LatestOnlyOperator):
> > > > > > ...
> > > > >
> > > > > I might be overthinking this and there could be a simpler way?
> > > > > Sorry, I am still learning Airflow concepts...
> > > > >
> > > > > Thanks.
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > Ruslan Dautkhanov
> > > > >
> > > > > On Sat, Mar 18, 2017 at 2:15 PM, Boris Tyukin <
> bo...@boristyukin.com
> > >
> > > > > wrote:
> > > > >
> > > > > > Thanks George for that feature!
> > > > > >
> > > > > > sure, just created a jira on this
> > > > > > https://issues.apache.org/jira/browse/AIRFLOW-1008
> > > > > >
> > > > > >
> > > > > > On Sat, Mar 18, 2017 at 12:05 PM, siddharth anand <
> > san...@apache.org
> > > >
> > > > > > wrote:
> > > > > >
> > > > > > > Thx Boris . Credit goes to George (gwax) for the implementation
> > of
> > > > the
> > > > > > > LatestOnlyOperator.
> > > > > > >
> > > > > > > Boris,
> > > > > > > Can you describe what you mean in a Jira?
> > > > > > > -s
> > > > > > >
> > > > > > > On Fri, Mar 17, 2017 at 6:02 PM, Boris Tyukin <
> > > bo...@boristyukin.com
> > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > > > this is nice indeed along with the new catchup option
> > > > > > > > https://airflow.incubator.apache.org/scheduler.html#
> > > > > > backfill-and-catchup
> > > > > > > >
> > > > > > > > Thanks Sid and Ben for adding these new options!
> > > > > > > >
> > > > > > > > for a complete picture, it would be nice to force only one
> dag
> > > run
> > > > at
> > > > > > the
> > > > > > > > time.
> > > > > > > >
> > > > > > > > On Fri, Mar 17, 2017 at 7:33 PM, siddharth anand <
> > > > san...@apache.org>
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > With the Apache Airflow 1.8 release imminent, you may want
> to
> > > try
> > > > > out
> > > > > > > the
> > > > > > > > >
> > > > > > > > > *LatestOnlyOperator.*
> > > > > > > > >
> > > > > > > > > If you want your DAG to only run on the most recent
> scheduled
> > > > slot,
> > > > > > > > > regardless of backlog, this operator will skip running
> > > downstream
> > > > > > tasks
> > > > > > > > for
> > > > > > > > > all DAG Runs prior to the current time slot.
> > > > > > > > >
> > > > > > > > > For example, I might have a DAG that takes a DB snapshot
> > once a
> > > > > day.
> > > > > > It
> > > > > > > > > might be that I paused that DAG for 2 weeks 

variable scope with dynamic dags

2017-03-22 Thread Boris Tyukin
Hi,

I have a weird question but it bugs my mind. I have some like below to
generate dags dynamically, using Max's example code from FAQ.

It works fine but I have one large dict (let's call it my_outer_dict) that
takes over 60Mb in memory and I need to access it from all generated dags.
Needless to say, i do not want to recreate that dict for every dag as I
want to load it to memory only once.

To my surprise, if i define that dag outside of my dag definition code, I
can still access it.

Can someone explain why and where is it stored? I thought only dag
definitions are loaded to dagbag and not the variables outside it.

Is it even a good practice and will it work still if I switch to celery
executor?


def get_dag(i):
dag_id = 'foo_{}'.format(i)
dag = DAG(dag_id)

print my_outer_dict

my_outer_dict = {}
for i in range(10):
dag = get_dag(i)
globals()[dag.dag_id] = dag


Re: 1.8.1 release

2017-03-22 Thread siddharth anand
Ruslan,
Thanks for sharing this list. I can pick a few up. I agree we should aim to
get some of them into 1.8.1.

-s

On Tue, Mar 21, 2017 at 2:29 PM, Ruslan Dautkhanov 
wrote:

> Some of the issues I ran into while testing 1.8rc5 :
>
> https://issues.apache.org/jira/browse/AIRFLOW-1015
> > https://issues.apache.org/jira/browse/AIRFLOW-1013
> > https://issues.apache.org/jira/browse/AIRFLOW-1004
> > https://issues.apache.org/jira/browse/AIRFLOW-1003
> > https://issues.apache.org/jira/browse/AIRFLOW-1001
> > https://issues.apache.org/jira/browse/AIRFLOW-1015
>
>
> It would be great to have at least some of them fixed in 1.8.1.
>
> Thank you.
>
>
>
>
> --
> Ruslan Dautkhanov
>
> On Tue, Mar 21, 2017 at 3:02 PM, Dan Davydov  invalid
> > wrote:
>
> > Here is my list for targeted 1.8.1 fixes:
> > https://issues.apache.org/jira/browse/AIRFLOW-982
> > https://issues.apache.org/jira/browse/AIRFLOW-983
> > https://issues.apache.org/jira/browse/AIRFLOW-1019 (and in general the
> > slow
> > startup time from this new logic of orphaned/reset task)
> > https://issues.apache.org/jira/browse/AIRFLOW-1017 (which I will
> hopefully
> > have a fix out for soon just finishing up tests)
> >
> > We are also hitting a new issue with subdags with rc5 that we weren't
> > hitting with rc4 where subdags will occasionally just hang (had to roll
> > back from rc5 to rc4), I'll try to spin up a JIRA for it soon which
> should
> > be on the list too.
> >
> >
> > On Tue, Mar 21, 2017 at 1:54 PM, Chris Riccomini 
> > wrote:
> >
> > > Agreed. I'm looking for a list of checksums/JIRAs that we want in the
> > > bugfix release.
> > >
> > > On Tue, Mar 21, 2017 at 12:54 PM, Bolke de Bruin 
> > > wrote:
> > >
> > > >
> > > >
> > > > > On 21 Mar 2017, at 12:51, Bolke de Bruin 
> wrote:
> > > > >
> > > > > My suggestion, as we are using semantic versioning is:
> > > > >
> > > > > 1) no new features in the 1.8 branch
> > > > > 2) only bug fixes in the 1.8 branch
> > > > > 3) new features to land in 1.9
> > > > >
> > > > > This allows companies to
> > > >
> > > > Have a "known" version and can move to the new branch when they want
> to
> > > > get new features. Obviously we only support N-1, so when 1.10 comes
> out
> > > we
> > > > stop supporting 1.8.X.
> > > >
> > > > >
> > > > > Sent from my iPhone
> > > > >
> > > > >> On 21 Mar 2017, at 11:22, Chris Riccomini 
> > > > wrote:
> > > > >>
> > > > >> Hey all,
> > > > >>
> > > > >> I suggest that we start a 1.8.1 Airflow release now. The goal
> would
> > > be:
> > > > >>
> > > > >> 1) get a second release under our belt
> > > > >> 2) patch known issues with the 1.8.0 release
> > > > >>
> > > > >> I'm happy to run it, but I saw Maxime mentioning that Airbnb might
> > > want
> > > > to.
> > > > >> @Max et al, can you comment?
> > > > >>
> > > > >> Also, can folks supply JIRAs for stuff that think needs to be in
> the
> > > > 1.8.1
> > > > >> bugfix release?
> > > > >>
> > > > >> Cheers,
> > > > >> Chris
> > > >
> > >
> >
>