[jira] [Commented] (AIRFLOW-19) How can I have an Operator B iterate over a list returned from upstream by Operator A?

2016-04-28 Thread Chris Riccomini (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-19?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15263091#comment-15263091
 ] 

Chris Riccomini commented on AIRFLOW-19:


Honestly, I'm not entirely sure. We have a hard-coded config file that defines 
which tables we copy around. We don't use Hadoop/HDFS. Perhaps 
[~maxime.beauche...@apache.org] can comment.

One way I can think of doing this is to query the files directly from the 
python script, rather than from a DistCpSensor, and then add all the operators 
in a for loop without having to use a PythonOperator.

> How can I have an Operator B iterate over a list returned from upstream by 
> Operator A?
> --
>
> Key: AIRFLOW-19
> URL: https://issues.apache.org/jira/browse/AIRFLOW-19
> Project: Apache Airflow
>  Issue Type: Bug
>Reporter: Praveenkumar Venkatesan
>Priority: Minor
>  Labels: support
>
> Here is what I am trying to do exactly: 
> https://gist.github.com/praveev/7b93b50746f8e965f7139ecba028490a
> the python operator log just returns the following
> [2016-04-28 11:56:22,296] {models.py:1041} INFO - Executing 
>  on 2016-04-28 11:56:12
> [2016-04-28 11:56:22,350] {python_operator.py:66} INFO - Done. Returned value 
> was: None
> it didn't even print my kwargs and to_process data
> To simplify this. Lets say t1 returns 3 elements. I want to iterate over the 
> list and run t2 -> t3 for each element.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (AIRFLOW-19) How can I have an Operator B iterate over a list returned from upstream by Operator A?

2016-04-28 Thread Praveenkumar Venkatesan (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-19?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15263033#comment-15263033
 ] 

Praveenkumar Venkatesan commented on AIRFLOW-19:


[~criccomini] what is the best way to go about doing this in airflow? is there 
one? 

do you mean this is better posted in google group? but I posted this on gitter 
and there was no resolution on what the recommended way is.

> How can I have an Operator B iterate over a list returned from upstream by 
> Operator A?
> --
>
> Key: AIRFLOW-19
> URL: https://issues.apache.org/jira/browse/AIRFLOW-19
> Project: Apache Airflow
>  Issue Type: Bug
>Reporter: Praveenkumar Venkatesan
>Priority: Minor
>  Labels: support
>
> Here is what I am trying to do exactly: 
> https://gist.github.com/praveev/7b93b50746f8e965f7139ecba028490a
> the python operator log just returns the following
> [2016-04-28 11:56:22,296] {models.py:1041} INFO - Executing 
>  on 2016-04-28 11:56:12
> [2016-04-28 11:56:22,350] {python_operator.py:66} INFO - Done. Returned value 
> was: None
> it didn't even print my kwargs and to_process data
> To simplify this. Lets say t1 returns 3 elements. I want to iterate over the 
> list and run t2 -> t3 for each element.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (AIRFLOW-19) How can I have an Operator B iterate over a list returned from upstream by Operator A?

2016-04-28 Thread Praveenkumar Venkatesan (JIRA)
Praveenkumar Venkatesan created AIRFLOW-19:
--

 Summary: How can I have an Operator B iterate over a list returned 
from upstream by Operator A?
 Key: AIRFLOW-19
 URL: https://issues.apache.org/jira/browse/AIRFLOW-19
 Project: Apache Airflow
  Issue Type: Bug
Reporter: Praveenkumar Venkatesan
Priority: Minor


Here is what I am trying to do exactly: 
https://gist.github.com/praveev/7b93b50746f8e965f7139ecba028490a

the python operator log just returns the following

[2016-04-28 11:56:22,296] {models.py:1041} INFO - Executing 
 on 2016-04-28 11:56:12
[2016-04-28 11:56:22,350] {python_operator.py:66} INFO - Done. Returned value 
was: None

it didn't even print my kwargs and to_process data

To simplify this. Lets say t1 returns 3 elements. I want to iterate over the 
list and run t2 -> t3 for each element.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (AIRFLOW-14) DagRun Refactor (Scheduler 2.0)

2016-04-28 Thread Chris Riccomini (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-14?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15262943#comment-15262943
 ] 

Chris Riccomini commented on AIRFLOW-14:


If there are two DRJs running simultaneously, this race condition can occur, as 
far as I know:

# DRJ0 and DRJ1 both see DAG0 needs to be rnu
# DRJ0 checks if the lock is set. It's not.
# DRJ1 checks if the lock is set. It's not.
# DRJ0 sets lock to be owned by DRJ0.
# DRJ0 checks to see who the owner is. It's DRJ0. DRJ0 starts running the DAG.
# DRJ1 sets the lock to be owned by DRJ1.
# DRJ1 checks to see who the owner is. It's DRJ1. DRJ1 starts running the DAG.

The SQL that I wrote above prevents this from happening because the second 
update (6) doesn't happen because at that point, the lock_id isn't null, it's 
DRJ0.

> DagRun Refactor (Scheduler 2.0)
> ---
>
> Key: AIRFLOW-14
> URL: https://issues.apache.org/jira/browse/AIRFLOW-14
> Project: Apache Airflow
>  Issue Type: Improvement
>Reporter: Jeremiah Lowin
>Assignee: Jeremiah Lowin
>  Labels: backfill, dagrun, scheduler
>
> For full proposal, please see the Wiki: 
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=62694286
> Borrowing from that page: 
> *Description of New Workflow*
> DagRuns represent the state of a DAG at a certain point in time (perhaps they 
> should be called DagInstances?). To run a DAG – or to manage the execution of 
> a DAG – a DagRun must first be created. This can be done manually (simply by 
> creating a DagRun object) or automatically, using methods like 
> dag.schedule_dag(). Therefore, both scheduling new runs OR introducing ad-hoc 
> runs can be done by any process at any time, simply by creating the 
> appropriate object.
> Just creating a DagRun is not enough to actually run the DAG (just as 
> creating a TaskInstance is not the same as actually running a task). We need 
> a Job for that. The DagRunJob is fairly simple in structure. It maintains a 
> set of DagRuns that it is tasked with executing, and loops over that set 
> until all the DagRuns either succeed or fail. New DagRuns can be passed to 
> the job explicitly via DagRunJob.submit_dagruns() or by defining its 
> DagRunJob.collect_dagruns() method, which is called during each loop. When 
> the DagRunJob is executing a specific DagRun, it locks it. Other DagRunJobs 
> will not try to execute locked DagRuns. This way, many DagRunJobs can run 
> simultaneously in either a local or distributed setting, and can even be 
> pointed at the same DagRuns, without worrying about collisions or 
> interference.
> The basic DagRunJob loop works like this:
> - refresh dags
> - collect new dagruns
> - process dagruns (including updating dagrun states for success/failure)
> - call executor/own heartbeat
> By tweaking the DagRunJob, we can easily recreate the behavior of the current 
> SchedulerJob and BackfillJob. The Scheduler simply runs forever and picks up 
> ALL active DagRuns in collect_dagruns(); Backfill generates DagRuns 
> corresponding to the requested start/end dates and submits them to itself 
> prior to initiating its loop.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (AIRFLOW-18) Alembic's constraints and indexes are unnamed thus hard to drop or change

2016-04-28 Thread Bolke de Bruin (JIRA)
Bolke de Bruin created AIRFLOW-18:
-

 Summary: Alembic's constraints and indexes are unnamed thus hard 
to drop or change
 Key: AIRFLOW-18
 URL: https://issues.apache.org/jira/browse/AIRFLOW-18
 Project: Apache Airflow
  Issue Type: Bug
Reporter: Bolke de Bruin


Eg. in XXX_add_dagrun.py the constraint is added without a name:

sa.UniqueConstraint('dag_id', 'execution_date'),

This makes constraint naming database specific, ie. postgres' name for the 
constraint be different than mysql's and sqllite's.

Best practice per http://alembic.readthedocs.io/en/latest/naming.html is to 
have naming conventions that are being applied. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (AIRFLOW-17) Master Travis CI build is broken

2016-04-28 Thread Arthur Wiedmer (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-17?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15262573#comment-15262573
 ] 

Arthur Wiedmer commented on AIRFLOW-17:
---

Unfortunately, I have very little knowledge of how the license check actually 
works.

The code here seems very simplistic : 
https://github.com/airbnb/airflow/blob/master/scripts/ci/check-license.sh#L98
Maybe we can disable this check in the case of a revert.


> Master Travis CI build is broken
> 
>
> Key: AIRFLOW-17
> URL: https://issues.apache.org/jira/browse/AIRFLOW-17
> Project: Apache Airflow
>  Issue Type: Improvement
>Reporter: Chris Riccomini
>
> It looks like master is broken:
> https://travis-ci.org/airbnb/airflow/branches
> This build seems to be the first one that broke:
> https://travis-ci.org/airbnb/airflow/builds/126014622



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (AIRFLOW-17) Master Travis CI build is broken

2016-04-28 Thread Chris Riccomini (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-17?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15262525#comment-15262525
 ] 

Chris Riccomini commented on AIRFLOW-17:


Got it. Is there any way to fix it?

> Master Travis CI build is broken
> 
>
> Key: AIRFLOW-17
> URL: https://issues.apache.org/jira/browse/AIRFLOW-17
> Project: Apache Airflow
>  Issue Type: Improvement
>Reporter: Chris Riccomini
>
> It looks like master is broken:
> https://travis-ci.org/airbnb/airflow/branches
> This build seems to be the first one that broke:
> https://travis-ci.org/airbnb/airflow/builds/126014622



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (AIRFLOW-17) Master is broken

2016-04-28 Thread Chris Riccomini (JIRA)
Chris Riccomini created AIRFLOW-17:
--

 Summary: Master is broken
 Key: AIRFLOW-17
 URL: https://issues.apache.org/jira/browse/AIRFLOW-17
 Project: Apache Airflow
  Issue Type: Improvement
Reporter: Chris Riccomini


It looks like master is broken:

https://travis-ci.org/airbnb/airflow/branches

This build seems to be the first one that broke:

https://travis-ci.org/airbnb/airflow/builds/126014622



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (AIRFLOW-17) Master Travis CI build is broken

2016-04-28 Thread Chris Riccomini (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-17?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Riccomini updated AIRFLOW-17:
---
Summary: Master Travis CI build is broken  (was: Master is broken)

> Master Travis CI build is broken
> 
>
> Key: AIRFLOW-17
> URL: https://issues.apache.org/jira/browse/AIRFLOW-17
> Project: Apache Airflow
>  Issue Type: Improvement
>Reporter: Chris Riccomini
>
> It looks like master is broken:
> https://travis-ci.org/airbnb/airflow/branches
> This build seems to be the first one that broke:
> https://travis-ci.org/airbnb/airflow/builds/126014622



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (AIRFLOW-16) Use GCP-specific fields in hook view

2016-04-28 Thread Chris Riccomini (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-16?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Riccomini updated AIRFLOW-16:
---
Description: 
Once AIRFLOW-15 is done, we should update the Google cloud base hook to use 
fields for project, service account, etc. We currently just use a JSON blob in 
the {{extras}} field. We can steal this code from 
[this|https://github.com/airbnb/airflow/pull/1119/files] PR, where 
{{extras\_\_google_cloud_platform_*}} is introduced in views.py.

We should also look at creating just one hook of type google_cloud_platform, 
rather than one hook per Google cloud service. Again, this is how the PR 
(above) works, and it's pretty handy.

  was:
Once AIRFLOW-15 is done, we should update the Google cloud base hook to use 
fields for project, service account, etc. We currently just use a JSON blob in 
the {{extras}} field. We can steal this code from 
[this|https://github.com/airbnb/airflow/pull/1119/files] PR, where 
{{extras__google_cloud_platform_*}} is introduced in views.py.

We should also look at creating just one hook of type google_cloud_platform, 
rather than one hook per Google cloud service. Again, this is how the PR 
(above) works, and it's pretty handy.


> Use GCP-specific fields in hook view
> 
>
> Key: AIRFLOW-16
> URL: https://issues.apache.org/jira/browse/AIRFLOW-16
> Project: Apache Airflow
>  Issue Type: Improvement
>Reporter: Chris Riccomini
>
> Once AIRFLOW-15 is done, we should update the Google cloud base hook to use 
> fields for project, service account, etc. We currently just use a JSON blob 
> in the {{extras}} field. We can steal this code from 
> [this|https://github.com/airbnb/airflow/pull/1119/files] PR, where 
> {{extras\_\_google_cloud_platform_*}} is introduced in views.py.
> We should also look at creating just one hook of type google_cloud_platform, 
> rather than one hook per Google cloud service. Again, this is how the PR 
> (above) works, and it's pretty handy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (AIRFLOW-16) Use GCP-specific fields in hook view

2016-04-28 Thread Chris Riccomini (JIRA)
Chris Riccomini created AIRFLOW-16:
--

 Summary: Use GCP-specific fields in hook view
 Key: AIRFLOW-16
 URL: https://issues.apache.org/jira/browse/AIRFLOW-16
 Project: Apache Airflow
  Issue Type: Improvement
Reporter: Chris Riccomini


Once AIRFLOW-15 is done, we should update the Google cloud base hook to use 
fields for project, service account, etc. We currently just use a JSON blob in 
the {{extras}} field. We can steal this code from 
[this|https://github.com/airbnb/airflow/pull/1119/files] PR, where 
{{extras__google_cloud_platform_*}} is introduced in views.py.

We should also look at creating just one hook of type google_cloud_platform, 
rather than one hook per Google cloud service. Again, this is how the PR 
(above) works, and it's pretty handy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (AIRFLOW-15) Remove GCloud from Airflow

2016-04-28 Thread Chris Riccomini (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-15?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15262418#comment-15262418
 ] 

Chris Riccomini commented on AIRFLOW-15:


Pull request: https://github.com/airbnb/airflow/pull/1448

[~jlowin], could use a review.

> Remove GCloud from Airflow
> --
>
> Key: AIRFLOW-15
> URL: https://issues.apache.org/jira/browse/AIRFLOW-15
> Project: Apache Airflow
>  Issue Type: Task
>Reporter: Chris Riccomini
>Assignee: Chris Riccomini
>
> After speaking with Google, there was some concern about using the 
> [gcloud-python|https://github.com/GoogleCloudPlatform/gcloud-python] library 
> for Airflow. There are several concerns:
> # It's not clear (even to people at Google) what this library is, who owns 
> it, etc.
> # It does not support all services (the way 
> [google-api-python-client|https://github.com/google/google-api-python-client] 
> does).
> # There are compatibility issues between google-api-python-client and 
> gcloudpython.
> We currently support both, after libraries depending on which package you you 
> install: {{airfow[gcp_api]}} or {{airflow[gcloud]}}. This ticket is to remove 
> the {{airflow[gcloud]}} packaged, and all associated code.
> The main associated code, afaik, is the use of the {{gcloud}} library in the 
> Google cloud storage hooks/operators--specifically for Google cloud storage 
> Airfow logging.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (AIRFLOW-6) Remove high-charts

2016-04-28 Thread Maxime Beauchemin (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-6?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maxime Beauchemin reassigned AIRFLOW-6:
---

Assignee: Maxime Beauchemin

> Remove high-charts
> --
>
> Key: AIRFLOW-6
> URL: https://issues.apache.org/jira/browse/AIRFLOW-6
> Project: Apache Airflow
>  Issue Type: Bug
>Reporter: Chris Riccomini
>Assignee: Maxime Beauchemin
>
> According to LEGAL-246, we can't use High Charts in Airflow. The license is 
> incompatible with Apache. We should replace it with another library.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (AIRFLOW-14) DagRun Refactor (Scheduler 2.0)

2016-04-28 Thread Jeremiah Lowin (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-14?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15262359#comment-15262359
 ] 

Jeremiah Lowin commented on AIRFLOW-14:
---

Yes, that situation could arise... I'm not sure of the best way to handle this 
in a database-agnostic way (i.e.  SQLAlchemy). I'd appreciate any suggestions!

> DagRun Refactor (Scheduler 2.0)
> ---
>
> Key: AIRFLOW-14
> URL: https://issues.apache.org/jira/browse/AIRFLOW-14
> Project: Apache Airflow
>  Issue Type: Improvement
>Reporter: Jeremiah Lowin
>Assignee: Jeremiah Lowin
>  Labels: backfill, dagrun, scheduler
>
> For full proposal, please see the Wiki: 
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=62694286
> Borrowing from that page: 
> *Description of New Workflow*
> DagRuns represent the state of a DAG at a certain point in time (perhaps they 
> should be called DagInstances?). To run a DAG – or to manage the execution of 
> a DAG – a DagRun must first be created. This can be done manually (simply by 
> creating a DagRun object) or automatically, using methods like 
> dag.schedule_dag(). Therefore, both scheduling new runs OR introducing ad-hoc 
> runs can be done by any process at any time, simply by creating the 
> appropriate object.
> Just creating a DagRun is not enough to actually run the DAG (just as 
> creating a TaskInstance is not the same as actually running a task). We need 
> a Job for that. The DagRunJob is fairly simple in structure. It maintains a 
> set of DagRuns that it is tasked with executing, and loops over that set 
> until all the DagRuns either succeed or fail. New DagRuns can be passed to 
> the job explicitly via DagRunJob.submit_dagruns() or by defining its 
> DagRunJob.collect_dagruns() method, which is called during each loop. When 
> the DagRunJob is executing a specific DagRun, it locks it. Other DagRunJobs 
> will not try to execute locked DagRuns. This way, many DagRunJobs can run 
> simultaneously in either a local or distributed setting, and can even be 
> pointed at the same DagRuns, without worrying about collisions or 
> interference.
> The basic DagRunJob loop works like this:
> - refresh dags
> - collect new dagruns
> - process dagruns (including updating dagrun states for success/failure)
> - call executor/own heartbeat
> By tweaking the DagRunJob, we can easily recreate the behavior of the current 
> SchedulerJob and BackfillJob. The Scheduler simply runs forever and picks up 
> ALL active DagRuns in collect_dagruns(); Backfill generates DagRuns 
> corresponding to the requested start/end dates and submits them to itself 
> prior to initiating its loop.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (AIRFLOW-14) DagRun Refactor (Scheduler 2.0)

2016-04-28 Thread Chris Riccomini (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-14?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15262371#comment-15262371
 ] 

Chris Riccomini commented on AIRFLOW-14:


One way to do this is make the lock claim something like:

{code:sql}
UPDATE dag_run SET lock_id = 'my_lock_id' WHERE lock_id IS NULL AND id = 123;
SELECT lock_id FROM dag_run WHERE id = 123;
{code}

Then:

{code}
if lock_id == 'my_lock_id':
  # run the dag_run
{code}

> DagRun Refactor (Scheduler 2.0)
> ---
>
> Key: AIRFLOW-14
> URL: https://issues.apache.org/jira/browse/AIRFLOW-14
> Project: Apache Airflow
>  Issue Type: Improvement
>Reporter: Jeremiah Lowin
>Assignee: Jeremiah Lowin
>  Labels: backfill, dagrun, scheduler
>
> For full proposal, please see the Wiki: 
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=62694286
> Borrowing from that page: 
> *Description of New Workflow*
> DagRuns represent the state of a DAG at a certain point in time (perhaps they 
> should be called DagInstances?). To run a DAG – or to manage the execution of 
> a DAG – a DagRun must first be created. This can be done manually (simply by 
> creating a DagRun object) or automatically, using methods like 
> dag.schedule_dag(). Therefore, both scheduling new runs OR introducing ad-hoc 
> runs can be done by any process at any time, simply by creating the 
> appropriate object.
> Just creating a DagRun is not enough to actually run the DAG (just as 
> creating a TaskInstance is not the same as actually running a task). We need 
> a Job for that. The DagRunJob is fairly simple in structure. It maintains a 
> set of DagRuns that it is tasked with executing, and loops over that set 
> until all the DagRuns either succeed or fail. New DagRuns can be passed to 
> the job explicitly via DagRunJob.submit_dagruns() or by defining its 
> DagRunJob.collect_dagruns() method, which is called during each loop. When 
> the DagRunJob is executing a specific DagRun, it locks it. Other DagRunJobs 
> will not try to execute locked DagRuns. This way, many DagRunJobs can run 
> simultaneously in either a local or distributed setting, and can even be 
> pointed at the same DagRuns, without worrying about collisions or 
> interference.
> The basic DagRunJob loop works like this:
> - refresh dags
> - collect new dagruns
> - process dagruns (including updating dagrun states for success/failure)
> - call executor/own heartbeat
> By tweaking the DagRunJob, we can easily recreate the behavior of the current 
> SchedulerJob and BackfillJob. The Scheduler simply runs forever and picks up 
> ALL active DagRuns in collect_dagruns(); Backfill generates DagRuns 
> corresponding to the requested start/end dates and submits them to itself 
> prior to initiating its loop.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (AIRFLOW-14) DagRun Refactor (Scheduler 2.0)

2016-04-28 Thread Jeremiah Lowin (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-14?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15262357#comment-15262357
 ] 

Jeremiah Lowin commented on AIRFLOW-14:
---

Yes, that situation could arise... I'm not sure of the best way to handle this 
in a database-agnostic way (i.e.  SQLAlchemy). I'd appreciate any suggestions!

> DagRun Refactor (Scheduler 2.0)
> ---
>
> Key: AIRFLOW-14
> URL: https://issues.apache.org/jira/browse/AIRFLOW-14
> Project: Apache Airflow
>  Issue Type: Improvement
>Reporter: Jeremiah Lowin
>Assignee: Jeremiah Lowin
>  Labels: backfill, dagrun, scheduler
>
> For full proposal, please see the Wiki: 
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=62694286
> Borrowing from that page: 
> *Description of New Workflow*
> DagRuns represent the state of a DAG at a certain point in time (perhaps they 
> should be called DagInstances?). To run a DAG – or to manage the execution of 
> a DAG – a DagRun must first be created. This can be done manually (simply by 
> creating a DagRun object) or automatically, using methods like 
> dag.schedule_dag(). Therefore, both scheduling new runs OR introducing ad-hoc 
> runs can be done by any process at any time, simply by creating the 
> appropriate object.
> Just creating a DagRun is not enough to actually run the DAG (just as 
> creating a TaskInstance is not the same as actually running a task). We need 
> a Job for that. The DagRunJob is fairly simple in structure. It maintains a 
> set of DagRuns that it is tasked with executing, and loops over that set 
> until all the DagRuns either succeed or fail. New DagRuns can be passed to 
> the job explicitly via DagRunJob.submit_dagruns() or by defining its 
> DagRunJob.collect_dagruns() method, which is called during each loop. When 
> the DagRunJob is executing a specific DagRun, it locks it. Other DagRunJobs 
> will not try to execute locked DagRuns. This way, many DagRunJobs can run 
> simultaneously in either a local or distributed setting, and can even be 
> pointed at the same DagRuns, without worrying about collisions or 
> interference.
> The basic DagRunJob loop works like this:
> - refresh dags
> - collect new dagruns
> - process dagruns (including updating dagrun states for success/failure)
> - call executor/own heartbeat
> By tweaking the DagRunJob, we can easily recreate the behavior of the current 
> SchedulerJob and BackfillJob. The Scheduler simply runs forever and picks up 
> ALL active DagRuns in collect_dagruns(); Backfill generates DagRuns 
> corresponding to the requested start/end dates and submits them to itself 
> prior to initiating its loop.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (AIRFLOW-15) Remove GCloud from Airflow

2016-04-28 Thread Chris Riccomini (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-15?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15262352#comment-15262352
 ] 

Chris Riccomini commented on AIRFLOW-15:


Removes these:

https://github.com/airbnb/airflow/pull/1137
https://github.com/airbnb/airflow/pull/1119

> Remove GCloud from Airflow
> --
>
> Key: AIRFLOW-15
> URL: https://issues.apache.org/jira/browse/AIRFLOW-15
> Project: Apache Airflow
>  Issue Type: Task
>Reporter: Chris Riccomini
>Assignee: Chris Riccomini
>
> After speaking with Google, there was some concern about using the 
> [gcloud-python|https://github.com/GoogleCloudPlatform/gcloud-python] library 
> for Airflow. There are several concerns:
> # It's not clear (even to people at Google) what this library is, who owns 
> it, etc.
> # It does not support all services (the way 
> [google-api-python-client|https://github.com/google/google-api-python-client] 
> does).
> # There are compatibility issues between google-api-python-client and 
> gcloudpython.
> We currently support both, after libraries depending on which package you you 
> install: {{airfow[gcp_api]}} or {{airflow[gcloud]}}. This ticket is to remove 
> the {{airflow[gcloud]}} packaged, and all associated code.
> The main associated code, afaik, is the use of the {{gcloud}} library in the 
> Google cloud storage hooks/operators--specifically for Google cloud storage 
> Airfow logging.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (AIRFLOW-14) DagRun Refactor (Scheduler 2.0)

2016-04-28 Thread Jeremiah Lowin (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-14?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15262324#comment-15262324
 ] 

Jeremiah Lowin edited comment on AIRFLOW-14 at 4/28/16 3:22 PM:


DagRuns are primary keyed by (dag_id, execution_date), so there is only one 
canonical version. DRJ needs to make sure to refresh from the db to check for a 
lock immediately before running it, however. The mechanism is very similar to 
TaskInstance -- you can create as many TI objects as you want, but they all 
point at the one canonical version and can be refreshed at any time to reflect 
the "true" state.

DRJs pick up DagRuns in two ways:
1. explicitly via {{DagRunJob.submit_dags()}}. This is used for example by 
BackfillJob; it generates a bunch of DagRuns and calls {{submit_dags()}} to 
submit them to itself. Then it enters its loop. Scheduler also uses this after 
scheduling a DagRun, though it's actually redundant because of the second way 
(below)
2. automatically via {{DagRunJob.collect_dags()}}. This is called inside each 
DRJ loop and is used by SchedulerJob to look for any active DagRuns and add 
them to its set of DagRuns to execute.


was (Author: jlowin):
DagRuns are primary keyed by (dag_id, execution_date), so there is only one 
canonical version. DRJ needs to make sure to refresh from the db to check for a 
lock immediately before running it, however. The mechanism is very similar to 
TaskInstance -- you can create as many TI objects as you want, but they all 
point at the one canonical version and can be refreshed at any time to reflect 
the "true" state.

DRJs pick up DagRuns in two ways:
1. explicitly via {{DagRunJob.submit_dags()}}. This is used for example by 
BackfillJob; it generates a bunch of DagRuns and calls {{submit_dags()}} to 
submit them to itself. Then it enters its loop. Scheduler also uses this after 
scheduling a DagRun, though it's actually redundant because of the second way 
(below)
2. automatically via {{DagRunJob.collect_dags()}}. This is used by SchedulerJob 
to look for any active DagRuns and add them to its list.

> DagRun Refactor (Scheduler 2.0)
> ---
>
> Key: AIRFLOW-14
> URL: https://issues.apache.org/jira/browse/AIRFLOW-14
> Project: Apache Airflow
>  Issue Type: Improvement
>Reporter: Jeremiah Lowin
>Assignee: Jeremiah Lowin
>  Labels: backfill, dagrun, scheduler
>
> For full proposal, please see the Wiki: 
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=62694286
> Borrowing from that page: 
> *Description of New Workflow*
> DagRuns represent the state of a DAG at a certain point in time (perhaps they 
> should be called DagInstances?). To run a DAG – or to manage the execution of 
> a DAG – a DagRun must first be created. This can be done manually (simply by 
> creating a DagRun object) or automatically, using methods like 
> dag.schedule_dag(). Therefore, both scheduling new runs OR introducing ad-hoc 
> runs can be done by any process at any time, simply by creating the 
> appropriate object.
> Just creating a DagRun is not enough to actually run the DAG (just as 
> creating a TaskInstance is not the same as actually running a task). We need 
> a Job for that. The DagRunJob is fairly simple in structure. It maintains a 
> set of DagRuns that it is tasked with executing, and loops over that set 
> until all the DagRuns either succeed or fail. New DagRuns can be passed to 
> the job explicitly via DagRunJob.submit_dagruns() or by defining its 
> DagRunJob.collect_dagruns() method, which is called during each loop. When 
> the DagRunJob is executing a specific DagRun, it locks it. Other DagRunJobs 
> will not try to execute locked DagRuns. This way, many DagRunJobs can run 
> simultaneously in either a local or distributed setting, and can even be 
> pointed at the same DagRuns, without worrying about collisions or 
> interference.
> The basic DagRunJob loop works like this:
> - refresh dags
> - collect new dagruns
> - process dagruns (including updating dagrun states for success/failure)
> - call executor/own heartbeat
> By tweaking the DagRunJob, we can easily recreate the behavior of the current 
> SchedulerJob and BackfillJob. The Scheduler simply runs forever and picks up 
> ALL active DagRuns in collect_dagruns(); Backfill generates DagRuns 
> corresponding to the requested start/end dates and submits them to itself 
> prior to initiating its loop.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (AIRFLOW-14) DagRun Refactor (Scheduler 2.0)

2016-04-28 Thread Jeremiah Lowin (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-14?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15262324#comment-15262324
 ] 

Jeremiah Lowin commented on AIRFLOW-14:
---

DagRuns are primary keyed by (dag_id, execution_date), so there is only one 
canonical version. DRJ needs to make sure to refresh from the db to check for a 
lock immediately before running it, however. The mechanism is very similar to 
TaskInstance -- you can create as many TI objects as you want, but they all 
point at the one canonical version and can be refreshed at any time to reflect 
the "true" state.

DRJs pick up DagRuns in two ways:
1. explicitly via {{DagRunJob.submit_dags()}}. This is used for example by 
BackfillJob; it generates a bunch of DagRuns and calls {{submit_dags()}} to 
submit them to itself. Then it enters its loop. Scheduler also uses this after 
scheduling a DagRun, though it's actually redundant because of the second way 
(below)
2. automatically via {{DagRunJob.collect_dags()}}. This is used by SchedulerJob 
to look for any active DagRuns and add them to its list.

> DagRun Refactor (Scheduler 2.0)
> ---
>
> Key: AIRFLOW-14
> URL: https://issues.apache.org/jira/browse/AIRFLOW-14
> Project: Apache Airflow
>  Issue Type: Improvement
>Reporter: Jeremiah Lowin
>Assignee: Jeremiah Lowin
>  Labels: backfill, dagrun, scheduler
>
> For full proposal, please see the Wiki: 
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=62694286
> Borrowing from that page: 
> *Description of New Workflow*
> DagRuns represent the state of a DAG at a certain point in time (perhaps they 
> should be called DagInstances?). To run a DAG – or to manage the execution of 
> a DAG – a DagRun must first be created. This can be done manually (simply by 
> creating a DagRun object) or automatically, using methods like 
> dag.schedule_dag(). Therefore, both scheduling new runs OR introducing ad-hoc 
> runs can be done by any process at any time, simply by creating the 
> appropriate object.
> Just creating a DagRun is not enough to actually run the DAG (just as 
> creating a TaskInstance is not the same as actually running a task). We need 
> a Job for that. The DagRunJob is fairly simple in structure. It maintains a 
> set of DagRuns that it is tasked with executing, and loops over that set 
> until all the DagRuns either succeed or fail. New DagRuns can be passed to 
> the job explicitly via DagRunJob.submit_dagruns() or by defining its 
> DagRunJob.collect_dagruns() method, which is called during each loop. When 
> the DagRunJob is executing a specific DagRun, it locks it. Other DagRunJobs 
> will not try to execute locked DagRuns. This way, many DagRunJobs can run 
> simultaneously in either a local or distributed setting, and can even be 
> pointed at the same DagRuns, without worrying about collisions or 
> interference.
> The basic DagRunJob loop works like this:
> - refresh dags
> - collect new dagruns
> - process dagruns (including updating dagrun states for success/failure)
> - call executor/own heartbeat
> By tweaking the DagRunJob, we can easily recreate the behavior of the current 
> SchedulerJob and BackfillJob. The Scheduler simply runs forever and picks up 
> ALL active DagRuns in collect_dagruns(); Backfill generates DagRuns 
> corresponding to the requested start/end dates and submits them to itself 
> prior to initiating its loop.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (AIRFLOW-15) Remove GCloud from Airflow

2016-04-28 Thread Chris Riccomini (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-15?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15262248#comment-15262248
 ] 

Chris Riccomini commented on AIRFLOW-15:


Note: compatibility issue is because we currently use 
{{SignedJwtAssertionCredentials}}, which was removed in the latest Google oauth 
library. Upgrading gcp_api breaks due to this backwards compatibility (we'll 
have to update our code). But gcloud requires the latest oauth library.

> Remove GCloud from Airflow
> --
>
> Key: AIRFLOW-15
> URL: https://issues.apache.org/jira/browse/AIRFLOW-15
> Project: Apache Airflow
>  Issue Type: Task
>Reporter: Chris Riccomini
>Assignee: Chris Riccomini
>
> After speaking with Google, there was some concern about using the 
> [gcloud-python|https://github.com/GoogleCloudPlatform/gcloud-python] library 
> for Airflow. There are several concerns:
> # It's not clear (even to people at Google) what this library is, who owns 
> it, etc.
> # It does not support all services (the way 
> [google-api-python-client|https://github.com/google/google-api-python-client] 
> does).
> # There are compatibility issues between google-api-python-client and 
> gcloudpython.
> We currently support both, after libraries depending on which package you you 
> install: {{airfow[gcp_api]}} or {{airflow[gcloud]}}. This ticket is to remove 
> the {{airflow[gcloud]}} packaged, and all associated code.
> The main associated code, afaik, is the use of the {{gcloud}} library in the 
> Google cloud storage hooks/operators--specifically for Google cloud storage 
> Airfow logging.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (AIRFLOW-14) DagRun Refactor (Scheduler 2.0)

2016-04-28 Thread Chris Riccomini (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-14?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15262235#comment-15262235
 ] 

Chris Riccomini commented on AIRFLOW-14:


Overall, major +1 on this.

One thing that I think is glossed over is what happens when a DagRunJob dies 
({{kill -9}}, for example)? You mention heartbeats, but I don't see a mention 
of who is going to clean up/take over the execution of the wedged DagRuns when 
their job dies. I see this problem a lot with Airflow right now (DagRuns stuck 
in 'running' forever), so it'd be good to think through.

> DagRun Refactor (Scheduler 2.0)
> ---
>
> Key: AIRFLOW-14
> URL: https://issues.apache.org/jira/browse/AIRFLOW-14
> Project: Apache Airflow
>  Issue Type: Improvement
>Reporter: Jeremiah Lowin
>Assignee: Jeremiah Lowin
>  Labels: backfill, dagrun, scheduler
>
> For full proposal, please see the Wiki: 
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=62694286
> Borrowing from that page: 
> *Description of New Workflow*
> DagRuns represent the state of a DAG at a certain point in time (perhaps they 
> should be called DagInstances?). To run a DAG – or to manage the execution of 
> a DAG – a DagRun must first be created. This can be done manually (simply by 
> creating a DagRun object) or automatically, using methods like 
> dag.schedule_dag(). Therefore, both scheduling new runs OR introducing ad-hoc 
> runs can be done by any process at any time, simply by creating the 
> appropriate object.
> Just creating a DagRun is not enough to actually run the DAG (just as 
> creating a TaskInstance is not the same as actually running a task). We need 
> a Job for that. The DagRunJob is fairly simple in structure. It maintains a 
> set of DagRuns that it is tasked with executing, and loops over that set 
> until all the DagRuns either succeed or fail. New DagRuns can be passed to 
> the job explicitly via DagRunJob.submit_dagruns() or by defining its 
> DagRunJob.collect_dagruns() method, which is called during each loop. When 
> the DagRunJob is executing a specific DagRun, it locks it. Other DagRunJobs 
> will not try to execute locked DagRuns. This way, many DagRunJobs can run 
> simultaneously in either a local or distributed setting, and can even be 
> pointed at the same DagRuns, without worrying about collisions or 
> interference.
> The basic DagRunJob loop works like this:
> - refresh dags
> - collect new dagruns
> - process dagruns (including updating dagrun states for success/failure)
> - call executor/own heartbeat
> By tweaking the DagRunJob, we can easily recreate the behavior of the current 
> SchedulerJob and BackfillJob. The Scheduler simply runs forever and picks up 
> ALL active DagRuns in collect_dagruns(); Backfill generates DagRuns 
> corresponding to the requested start/end dates and submits them to itself 
> prior to initiating its loop.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)