[GitHub] [airflow] ashb commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run

2020-01-16 Thread GitBox
ashb commented on a change in pull request #4751: [AIRFLOW-3607] collected 
trigger rule dep check per dag run
URL: https://github.com/apache/airflow/pull/4751#discussion_r367549812
 
 

 ##
 File path: tests/models/test_dagrun.py
 ##
 @@ -234,8 +234,9 @@ def test_dagrun_deadlock(self):
 ti_op2.set_state(state=State.NONE, session=session)
 
 dr.update_state()
-self.assertEqual(dr.state, State.RUNNING)
+self.assertEqual(dr.state, State.FAILED)
 
+dr.set_state(State.RUNNING)
 
 Review comment:
   So before your change these tasks ended up in this state: `[, ]`. And looking at the TriggerRuleDep for ONE_FAILED:
   
   ```
   elif tr == TR.ONE_FAILED:
   if upstream_done and not (failed or upstream_failed):
   ti.set_state(State.SKIPPED, session)
   ```
   
   So that looks right - op2/B should be in SKIPPED state.
   
   @amichai07 So this is a bug, and the test was right and shouldn't be changed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run

2020-01-15 Thread GitBox
ashb commented on a change in pull request #4751: [AIRFLOW-3607] collected 
trigger rule dep check per dag run
URL: https://github.com/apache/airflow/pull/4751#discussion_r367147960
 
 

 ##
 File path: tests/models/test_dagrun.py
 ##
 @@ -234,8 +234,9 @@ def test_dagrun_deadlock(self):
 ti_op2.set_state(state=State.NONE, session=session)
 
 dr.update_state()
-self.assertEqual(dr.state, State.RUNNING)
+self.assertEqual(dr.state, State.FAILED)
 
 Review comment:
   This _does_ look more right now, I just wonder why it was in RUNNING state 
before?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run

2020-01-15 Thread GitBox
ashb commented on a change in pull request #4751: [AIRFLOW-3607] collected 
trigger rule dep check per dag run
URL: https://github.com/apache/airflow/pull/4751#discussion_r367147759
 
 

 ##
 File path: tests/models/test_dagrun.py
 ##
 @@ -234,8 +234,9 @@ def test_dagrun_deadlock(self):
 ti_op2.set_state(state=State.NONE, session=session)
 
 dr.update_state()
-self.assertEqual(dr.state, State.RUNNING)
+self.assertEqual(dr.state, State.FAILED)
 
 Review comment:
   I'm still not 100% on this change. I'll hope to look at this tomorrow 
(because otherwise I'll merge https://github.com/apache/airflow/pull/6792/files 
which is going to cause a number of conflicts for one of us)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run

2020-01-10 Thread GitBox
ashb commented on a change in pull request #4751: [AIRFLOW-3607] collected 
trigger rule dep check per dag run
URL: https://github.com/apache/airflow/pull/4751#discussion_r365368753
 
 

 ##
 File path: tests/models/test_dagrun.py
 ##
 @@ -229,7 +229,7 @@ def test_dagrun_deadlock(self):
start_date=now)
 
 ti_op1 = dr.get_task_instance(task_id=op1.task_id)
-ti_op1.set_state(state=State.SUCCESS, session=session)
+ti_op1.set_state(state=State.FAILED, session=session)
 
 Review comment:
   Wait. What? I. am not sure of anything anymore


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run

2020-01-10 Thread GitBox
ashb commented on a change in pull request #4751: [AIRFLOW-3607] collected 
trigger rule dep check per dag run
URL: https://github.com/apache/airflow/pull/4751#discussion_r365368138
 
 

 ##
 File path: tests/models/test_dagrun.py
 ##
 @@ -229,7 +229,7 @@ def test_dagrun_deadlock(self):
start_date=now)
 
 ti_op1 = dr.get_task_instance(task_id=op1.task_id)
-ti_op1.set_state(state=State.SUCCESS, session=session)
+ti_op1.set_state(state=State.FAILED, session=session)
 
 Review comment:
   Okay, I've cone back to the original commit where we introduced the concept 
of deadlocks: 24a7072990fb232a6fe23c9eb3fdb72ba0695c96 and it says this:
   
   ```
   3. Deadlock — A dag run is deadlocked when no action is possible.
   This is determined by the presence of unfinished tasks without met
   dependencies. However, care must be taken when depends_on_past=True
   because individual dag runs could *look* like they are deadlocked
   when they are actually just waiting for earlier runs to finish.
   ```
   
   (not what I'd traditionally call a deadlock, but okay).
   
   So I think this is _wrong_. Since the task is in success state, but the 
trigger rule is one_failed it should cause the "deadlock" (or also known as 
terminal state): when ti_op1 is in success state ti_op2 won't ever be able to 
run, so this should cause it to fail


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run

2020-01-09 Thread GitBox
ashb commented on a change in pull request #4751: [AIRFLOW-3607] collected 
trigger rule dep check per dag run
URL: https://github.com/apache/airflow/pull/4751#discussion_r364663807
 
 

 ##
 File path: tests/models/test_dagrun.py
 ##
 @@ -229,7 +229,7 @@ def test_dagrun_deadlock(self):
start_date=now)
 
 ti_op1 = dr.get_task_instance(task_id=op1.task_id)
-ti_op1.set_state(state=State.SUCCESS, session=session)
+ti_op1.set_state(state=State.FAILED, session=session)
 
 Review comment:
   (This test is so badly named -- that is not a "deadlock".)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run

2020-01-09 Thread GitBox
ashb commented on a change in pull request #4751: [AIRFLOW-3607] collected 
trigger rule dep check per dag run
URL: https://github.com/apache/airflow/pull/4751#discussion_r364662668
 
 

 ##
 File path: tests/models/test_dagrun.py
 ##
 @@ -229,7 +229,7 @@ def test_dagrun_deadlock(self):
start_date=now)
 
 ti_op1 = dr.get_task_instance(task_id=op1.task_id)
-ti_op1.set_state(state=State.SUCCESS, session=session)
+ti_op1.set_state(state=State.FAILED, session=session)
 
 Review comment:
   Let me double check the rest of this test in detail then. You may be right, 
I just wanted to double check as this affects right in to the core of Airflow :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run

2020-01-08 Thread GitBox
ashb commented on a change in pull request #4751: [AIRFLOW-3607] collected 
trigger rule dep check per dag run
URL: https://github.com/apache/airflow/pull/4751#discussion_r364240075
 
 

 ##
 File path: tests/jobs/test_scheduler_job.py
 ##
 @@ -2034,7 +2034,7 @@ def test_dagrun_root_fail_unfinished(self):
 ti = dr.get_task_instance('test_dagrun_unfinished', 
session=session)
 ti.state = State.NONE
 session.commit()
-dr_state = dr.update_state()
+dr_state = dr.state
 
 Review comment:
   This looks suspect -- I would expect it to be
   ```suggestion
   dr.update_state()
   dr_state = dr.state
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run

2020-01-08 Thread GitBox
ashb commented on a change in pull request #4751: [AIRFLOW-3607] collected 
trigger rule dep check per dag run
URL: https://github.com/apache/airflow/pull/4751#discussion_r364240875
 
 

 ##
 File path: airflow/models/dagrun.py
 ##
 @@ -346,7 +331,38 @@ def update_state(self, session=None):
 session.merge(self)
 session.commit()
 
-return self.state
+return ready_tis
+
+@provide_session
+def get_ready_tis(self, session, scheduleable_tasks, finished_tasks):
+ready_tis = []
+for st in scheduleable_tasks:
+if st.are_dependencies_met(
+dep_context=DepContext(
+flag_upstream_failed=True,
+finished_tasks=finished_tasks),
+session=session):
+ready_tis.append(st)
+return ready_tis
+
+@provide_session
+def are_runnable_tis(self, session, unfinished_tasks, finished_tasks):
+# this is an optimization to avoid running on tasks that are not ready 
twice
+not_ready_tis = []
+# there might be runnable tasks that are up for retry and from some 
reason(retry delay, etc) are
+# not ready yet so we set the flags to count them in
+for ut in unfinished_tasks:
+if ut.are_dependencies_met(
+dep_context=DepContext(
+flag_upstream_failed=True,
+ignore_in_retry_period=True,
+ignore_in_reschedule_period=True,
+finished_tasks=finished_tasks),
+session=session):
+return False, not_ready_tis
 
 Review comment:
   Do you think you'd be able to add a test to cover this case (that a single 
call to `dr.update_state` updates the state for all these TIs?)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run

2020-01-08 Thread GitBox
ashb commented on a change in pull request #4751: [AIRFLOW-3607] collected 
trigger rule dep check per dag run
URL: https://github.com/apache/airflow/pull/4751#discussion_r364239552
 
 

 ##
 File path: airflow/ti_deps/deps/trigger_rule_dep.py
 ##
 @@ -34,11 +34,30 @@ class TriggerRuleDep(BaseTIDep):
 IGNOREABLE = True
 IS_TASK_DEP = True
 
+@staticmethod
+@provide_session
+def _get_states_count_upstream_ti(ti, finished_tasks, session):
+"""
+This function returns the states of the upstream tis for a specific ti 
in order to determine
+whether this ti can run in this iteration
+
+:param ti: the ti that we want to calculate deps for
+:type ti: airflow.models.TaskInstance
+:param finished_tasks: all the finished tasks of the dag_run
+:type finished_tasks: list[airflow.models.TaskInstance]
+"""
+if not finished_tasks:
+# this is for the strange feature of running tasks without dag_run
+finished_tasks = [t for t in 
ti.task.dag.get_task_instances(start_date=ti.execution_date,
+
end_date=ti.execution_date)
 
 Review comment:
   And we should pass the same `session` down to 
`ti.task.dag.get_task_instances)`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run

2020-01-08 Thread GitBox
ashb commented on a change in pull request #4751: [AIRFLOW-3607] collected 
trigger rule dep check per dag run
URL: https://github.com/apache/airflow/pull/4751#discussion_r364240472
 
 

 ##
 File path: tests/models/test_dagrun.py
 ##
 @@ -229,7 +229,7 @@ def test_dagrun_deadlock(self):
start_date=now)
 
 ti_op1 = dr.get_task_instance(task_id=op1.task_id)
-ti_op1.set_state(state=State.SUCCESS, session=session)
+ti_op1.set_state(state=State.FAILED, session=session)
 
 Review comment:
   Why did you need to change this test? It doesn't look like it should need 
any changes


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run

2020-01-08 Thread GitBox
ashb commented on a change in pull request #4751: [AIRFLOW-3607] collected 
trigger rule dep check per dag run
URL: https://github.com/apache/airflow/pull/4751#discussion_r364226856
 
 

 ##
 File path: airflow/models/dagrun.py
 ##
 @@ -263,48 +262,34 @@ def update_state(self, session=None):
 Determines the overall state of the DagRun based on the state
 of its TaskInstances.
 
+:param finished_tasks: The finished tasks of this run
 
 Review comment:
   This is not a param -- I think you meant `:return:` and `:rtype:`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run

2020-01-08 Thread GitBox
ashb commented on a change in pull request #4751: [AIRFLOW-3607] collected 
trigger rule dep check per dag run
URL: https://github.com/apache/airflow/pull/4751#discussion_r364239180
 
 

 ##
 File path: airflow/ti_deps/deps/trigger_rule_dep.py
 ##
 @@ -34,11 +34,30 @@ class TriggerRuleDep(BaseTIDep):
 IGNOREABLE = True
 IS_TASK_DEP = True
 
+@staticmethod
+@provide_session
+def _get_states_count_upstream_ti(ti, finished_tasks, session):
+"""
+This function returns the states of the upstream tis for a specific ti 
in order to determine
+whether this ti can run in this iteration
+
+:param ti: the ti that we want to calculate deps for
+:type ti: airflow.models.TaskInstance
+:param finished_tasks: all the finished tasks of the dag_run
+:type finished_tasks: list[airflow.models.TaskInstance]
+"""
+if not finished_tasks:
+# this is for the strange feature of running tasks without dag_run
+finished_tasks = [t for t in 
ti.task.dag.get_task_instances(start_date=ti.execution_date,
+
end_date=ti.execution_date)
 
 Review comment:
   We could do the filtering in the DB here?
   
   ```suggestion
   
end_date=ti.execution_date,
   
state=(State.finished() + [State.UPSTREAM_FAILED])
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run

2020-01-08 Thread GitBox
ashb commented on a change in pull request #4751: [AIRFLOW-3607] collected 
trigger rule dep check per dag run
URL: https://github.com/apache/airflow/pull/4751#discussion_r364228229
 
 

 ##
 File path: airflow/models/dagrun.py
 ##
 @@ -346,7 +331,38 @@ def update_state(self, session=None):
 session.merge(self)
 session.commit()
 
-return self.state
+return ready_tis
+
+@provide_session
+def get_ready_tis(self, session, scheduleable_tasks, finished_tasks):
 
 Review comment:
   This feels more like a private method, especially since it takes two list of 
tasks - I think we should make this "private" `_get_ready_tis` and remove the 
`@provide_session` decorator too so that we require to pass in and use the same 
DB session.
   
   For consistency with the rest of the code please also make session the last 
argument.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run

2020-01-08 Thread GitBox
ashb commented on a change in pull request #4751: [AIRFLOW-3607] collected 
trigger rule dep check per dag run
URL: https://github.com/apache/airflow/pull/4751#discussion_r364232692
 
 

 ##
 File path: airflow/models/dagrun.py
 ##
 @@ -263,48 +262,34 @@ def update_state(self, session=None):
 Determines the overall state of the DagRun based on the state
 of its TaskInstances.
 
+:param finished_tasks: The finished tasks of this run
+:type finished_tasks: list[airflow.models.TaskInstance]
 :return: State
 """
 
 dag = self.get_dag()
-
-tis = self.get_task_instances(session=session)
-self.log.debug("Updating state for %s considering %s task(s)", self, 
len(tis))
+ready_tis = []
+tis = [ti for ti in self.get_task_instances(session=session) if 
ti.state != State.REMOVED]
 
 Review comment:
   ~What was your reason for filtering our removed TIs here?~ Oh we did this 
anyway in the next loop. Gotcha.
   
   I think it would be better if we filtered it by using the existing `state` 
parameter of `get_task_instances` rather than having to filter it like this.
   
   We could either do it by passing in a list of states and exclude REMOVED, or 
if we change `get_task_instances` to return a "Result" (i.e. don't do `return 
tis.all()` in there, but instead `return tis` -- this will behave almost the 
same when iterated over) we can then extend the filtering here. For example:
   
   ```suggestion
   tis = 
self.get_task_instances(session=session).fliter(TaskInstance.state != 
State.REMOVED).all()
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run

2020-01-08 Thread GitBox
ashb commented on a change in pull request #4751: [AIRFLOW-3607] collected 
trigger rule dep check per dag run
URL: https://github.com/apache/airflow/pull/4751#discussion_r364238302
 
 

 ##
 File path: airflow/models/dagrun.py
 ##
 @@ -346,7 +331,38 @@ def update_state(self, session=None):
 session.merge(self)
 session.commit()
 
-return self.state
+return ready_tis
+
+@provide_session
+def get_ready_tis(self, session, scheduleable_tasks, finished_tasks):
+ready_tis = []
+for st in scheduleable_tasks:
+if st.are_dependencies_met(
+dep_context=DepContext(
+flag_upstream_failed=True,
+finished_tasks=finished_tasks),
+session=session):
+ready_tis.append(st)
+return ready_tis
+
+@provide_session
+def are_runnable_tis(self, session, unfinished_tasks, finished_tasks):
+# this is an optimization to avoid running on tasks that are not ready 
twice
+not_ready_tis = []
+# there might be runnable tasks that are up for retry and from some 
reason(retry delay, etc) are
+# not ready yet so we set the flags to count them in
+for ut in unfinished_tasks:
+if ut.are_dependencies_met(
+dep_context=DepContext(
+flag_upstream_failed=True,
+ignore_in_retry_period=True,
+ignore_in_reschedule_period=True,
+finished_tasks=finished_tasks),
+session=session):
+return False, not_ready_tis
 
 Review comment:
   This would abort the loop early when the first un-finished task is met, 
which I don't think we want to do for two reasons:
   
   1. If the DAG has more "paths" one path might not be ready but other tasks 
in the DAG could be.
   2. If a dag has a lot of "fan-out" (`base >> [ t1, t2, t3, t4]` etc) then 
only one of `tN` would get marked as upstream_fails per loop -- they should all 
be marked as failed at the same time really.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run

2019-11-19 Thread GitBox
ashb commented on a change in pull request #4751: [AIRFLOW-3607] collected 
trigger rule dep check per dag run
URL: https://github.com/apache/airflow/pull/4751#discussion_r348126662
 
 

 ##
 File path: airflow/jobs/scheduler_job.py
 ##
 @@ -721,25 +721,14 @@ def _process_task_instances(self, dag, 
task_instances_list, session=None):
 if run.state == State.RUNNING:
 make_transient(run)
 active_dag_runs.append(run)
-
-for run in active_dag_runs:
-self.log.debug("Examining active DAG run: %s", run)
-tis = run.get_task_instances(state=SCHEDULEABLE_STATES)
-
-# this loop is quite slow as it uses are_dependencies_met for
-# every task (in ti.is_runnable). This is also called in
-# update_state above which has already checked these tasks
-for ti in tis:
-task = dag.get_task(ti.task_id)
-
-# fixme: ti.task is transient but needs to be set
-ti.task = task
-
-if ti.are_dependencies_met(
-dep_context=DepContext(flag_upstream_failed=True),
-session=session):
+self.log.debug("Examining active DAG run: %s", run)
+for ti in run.ready_tis:
 
 Review comment:
   I thought `dr.update_state` currently didn't return anything? Did I read the 
code wrong?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run

2019-11-18 Thread GitBox
ashb commented on a change in pull request #4751: [AIRFLOW-3607] collected 
trigger rule dep check per dag run
URL: https://github.com/apache/airflow/pull/4751#discussion_r347502238
 
 

 ##
 File path: airflow/jobs/scheduler_job.py
 ##
 @@ -721,25 +721,14 @@ def _process_task_instances(self, dag, 
task_instances_list, session=None):
 if run.state == State.RUNNING:
 make_transient(run)
 active_dag_runs.append(run)
-
-for run in active_dag_runs:
-self.log.debug("Examining active DAG run: %s", run)
-tis = run.get_task_instances(state=SCHEDULEABLE_STATES)
-
-# this loop is quite slow as it uses are_dependencies_met for
-# every task (in ti.is_runnable). This is also called in
-# update_state above which has already checked these tasks
-for ti in tis:
-task = dag.get_task(ti.task_id)
-
-# fixme: ti.task is transient but needs to be set
-ti.task = task
-
-if ti.are_dependencies_met(
-dep_context=DepContext(flag_upstream_failed=True),
-session=session):
+self.log.debug("Examining active DAG run: %s", run)
+for ti in run.ready_tis:
 
 Review comment:
   Rather than a property on the run, lets make it the return value from 
`update_state` (that way we also don't have to "clear" it afterwards)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run

2019-11-18 Thread GitBox
ashb commented on a change in pull request #4751: [AIRFLOW-3607] collected 
trigger rule dep check per dag run
URL: https://github.com/apache/airflow/pull/4751#discussion_r347500085
 
 

 ##
 File path: airflow/jobs/scheduler_job.py
 ##
 @@ -721,25 +721,14 @@ def _process_task_instances(self, dag, 
task_instances_list, session=None):
 if run.state == State.RUNNING:
 make_transient(run)
 active_dag_runs.append(run)
-
-for run in active_dag_runs:
-self.log.debug("Examining active DAG run: %s", run)
-tis = run.get_task_instances(state=SCHEDULEABLE_STATES)
-
-# this loop is quite slow as it uses are_dependencies_met for
-# every task (in ti.is_runnable). This is also called in
-# update_state above which has already checked these tasks
-for ti in tis:
-task = dag.get_task(ti.task_id)
-
-# fixme: ti.task is transient but needs to be set
-ti.task = task
-
-if ti.are_dependencies_met(
 
 Review comment:
   (I do like this change overall though, just need to check it hasn't subtly 
changed a behaviour.)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run

2019-11-18 Thread GitBox
ashb commented on a change in pull request #4751: [AIRFLOW-3607] collected 
trigger rule dep check per dag run
URL: https://github.com/apache/airflow/pull/4751#discussion_r347499072
 
 

 ##
 File path: airflow/jobs/scheduler_job.py
 ##
 @@ -721,25 +721,14 @@ def _process_task_instances(self, dag, 
task_instances_list, session=None):
 if run.state == State.RUNNING:
 make_transient(run)
 active_dag_runs.append(run)
-
-for run in active_dag_runs:
-self.log.debug("Examining active DAG run: %s", run)
-tis = run.get_task_instances(state=SCHEDULEABLE_STATES)
-
-# this loop is quite slow as it uses are_dependencies_met for
-# every task (in ti.is_runnable). This is also called in
-# update_state above which has already checked these tasks
-for ti in tis:
-task = dag.get_task(ti.task_id)
-
-# fixme: ti.task is transient but needs to be set
-ti.task = task
-
-if ti.are_dependencies_met(
 
 Review comment:
   I'm trhing to think if there's a possible change of behaviour here - the 
`run.update_state()` was calling with `ignore_in_retry_period=True, 
ignore_in_reschedule_period=True` but we aren't here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run

2019-11-18 Thread GitBox
ashb commented on a change in pull request #4751: [AIRFLOW-3607] collected 
trigger rule dep check per dag run
URL: https://github.com/apache/airflow/pull/4751#discussion_r347295415
 
 

 ##
 File path: airflow/ti_deps/deps/trigger_rule_dep.py
 ##
 @@ -34,6 +35,31 @@ class TriggerRuleDep(BaseTIDep):
 IGNOREABLE = True
 IS_TASK_DEP = True
 
+@staticmethod
+def _get_states_count_upstream_ti(ti, finished_tasks):
+"""
+:param ti the ti that we want to calculate deps for
+:type ti airflow.models.TaskInstance
+:param finished_tasks all the finished tasks of the dag_run
+:type finished_tasks of finished ti's
+"""
+successes, skipped, failed, upstream_failed, done = 0, 0, 0, 0, 0
+upstream_tasks = [finished_task for finished_task in finished_tasks
+  if finished_task.task_id in 
ti.task.upstream_task_ids]
+if upstream_tasks:
+upstream_tasks_sorted = sorted(upstream_tasks, key=lambda x: 
x.state)
+for k, g in groupby(upstream_tasks_sorted, key=lambda x: x.state):
+if k == State.SUCCESS:
+successes = len(list(g))
+elif k == State.SKIPPED:
+skipped = len(list(g))
+elif k == State.FAILED:
+failed = len(list(g))
+elif k == State.UPSTREAM_FAILED:
+upstream_failed = len(list(g))
 
 Review comment:
   @noamelf Nice! I wasn't familiar with the class either.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run

2019-11-15 Thread GitBox
ashb commented on a change in pull request #4751: [AIRFLOW-3607] collected 
trigger rule dep check per dag run
URL: https://github.com/apache/airflow/pull/4751#discussion_r346968508
 
 

 ##
 File path: airflow/ti_deps/deps/trigger_rule_dep.py
 ##
 @@ -34,6 +35,31 @@ class TriggerRuleDep(BaseTIDep):
 IGNOREABLE = True
 IS_TASK_DEP = True
 
+@staticmethod
+def _get_states_count_upstream_ti(ti, finished_tasks):
+"""
+:param ti the ti that we want to calculate deps for
+:type ti airflow.models.TaskInstance
 
 Review comment:
   ```suggestion
   :type ti: airflow.models.TaskInstance
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run

2019-11-15 Thread GitBox
ashb commented on a change in pull request #4751: [AIRFLOW-3607] collected 
trigger rule dep check per dag run
URL: https://github.com/apache/airflow/pull/4751#discussion_r346968553
 
 

 ##
 File path: airflow/ti_deps/deps/trigger_rule_dep.py
 ##
 @@ -34,6 +35,31 @@ class TriggerRuleDep(BaseTIDep):
 IGNOREABLE = True
 IS_TASK_DEP = True
 
+@staticmethod
+def _get_states_count_upstream_ti(ti, finished_tasks):
+"""
+:param ti the ti that we want to calculate deps for
 
 Review comment:
   ```suggestion
   :param ti: the ti that we want to calculate deps for
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run

2019-11-15 Thread GitBox
ashb commented on a change in pull request #4751: [AIRFLOW-3607] collected 
trigger rule dep check per dag run
URL: https://github.com/apache/airflow/pull/4751#discussion_r346968400
 
 

 ##
 File path: airflow/ti_deps/deps/trigger_rule_dep.py
 ##
 @@ -34,6 +35,31 @@ class TriggerRuleDep(BaseTIDep):
 IGNOREABLE = True
 IS_TASK_DEP = True
 
+@staticmethod
+def _get_states_count_upstream_ti(ti, finished_tasks):
+"""
+:param ti the ti that we want to calculate deps for
+:type ti airflow.models.TaskInstance
+:param finished_tasks all the finished tasks of the dag_run
+:type finished_tasks of finished ti's
 
 Review comment:
   ```suggestion
   :type finished_tasks: list[airflow.models.TaskInstance]
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run

2019-11-15 Thread GitBox
ashb commented on a change in pull request #4751: [AIRFLOW-3607] collected 
trigger rule dep check per dag run
URL: https://github.com/apache/airflow/pull/4751#discussion_r346964896
 
 

 ##
 File path: airflow/ti_deps/deps/trigger_rule_dep.py
 ##
 @@ -49,33 +75,33 @@ def _get_dep_statuses(self, ti, session, dep_context):
 yield self._passing_status(reason="The task had a dummy trigger 
rule set.")
 return
 
-# TODO(unknown): this query becomes quite expensive with dags that 
have many
-# tasks. It should be refactored to let the task report to the dag run 
and get the
-# aggregates from there.
-qry = (
-session
-.query(
-func.coalesce(func.sum(
-case([(TI.state == State.SUCCESS, 1)], else_=0)), 0),
-func.coalesce(func.sum(
-case([(TI.state == State.SKIPPED, 1)], else_=0)), 0),
-func.coalesce(func.sum(
-case([(TI.state == State.FAILED, 1)], else_=0)), 0),
-func.coalesce(func.sum(
-case([(TI.state == State.UPSTREAM_FAILED, 1)], else_=0)), 
0),
-func.count(TI.task_id),
+if dep_context.finished_tasks is None:
 
 Review comment:
   Is this path actually hit from anywhere? If not I'd much rather we deleted 
this code.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run

2019-11-15 Thread GitBox
ashb commented on a change in pull request #4751: [AIRFLOW-3607] collected 
trigger rule dep check per dag run
URL: https://github.com/apache/airflow/pull/4751#discussion_r346970934
 
 

 ##
 File path: airflow/jobs/scheduler_job.py
 ##
 @@ -717,7 +718,10 @@ def _process_task_instances(self, dag, 
task_instances_list, session=None):
 run.dag = dag
 # todo: preferably the integrity check happens at dag collection 
time
 run.verify_integrity(session=session)
-run.update_state(session=session)
+finished_tasks = run.get_task_instances(state=State.finished() + 
[State.UPSTREAM_FAILED],
+session=session)
 
 Review comment:
   This works, but it asks for a lot more columns and rows than we need.
   
   We could try changing the return inside this function from `return 
tis.all()` to just `return tis`, and this line could become:
   
   ```python
   finished_tasks = run.get_task_instances(state=State.finished() + 
[State.UPSTREAM_FAILED],
   
session=session).options(load_only("task_id", "state"))
   ```
   
   
https://docs.sqlalchemy.org/en/13/orm/loading_columns.html#load-only-and-wildcard-options
   
   Do you think this is worth it or not worth it?
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run

2019-11-15 Thread GitBox
ashb commented on a change in pull request #4751: [AIRFLOW-3607] collected 
trigger rule dep check per dag run
URL: https://github.com/apache/airflow/pull/4751#discussion_r346967221
 
 

 ##
 File path: airflow/jobs/scheduler_job.py
 ##
 @@ -695,6 +695,7 @@ def _process_task_instances(self, dag, 
task_instances_list, session=None):
 # update the state of the previously active dag runs
 dag_runs = DagRun.find(dag_id=dag.dag_id, state=State.RUNNING, 
session=session)
 active_dag_runs = []
+dag_run_finished_ti_map = {}
 
 Review comment:
   I don't think we need to store this collection - it doesn't seem we look at 
this again, so the existing `finished_task` being inside the per-dagrun loop is 
enough?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run

2019-11-15 Thread GitBox
ashb commented on a change in pull request #4751: [AIRFLOW-3607] collected 
trigger rule dep check per dag run
URL: https://github.com/apache/airflow/pull/4751#discussion_r346968461
 
 

 ##
 File path: airflow/ti_deps/deps/trigger_rule_dep.py
 ##
 @@ -34,6 +35,31 @@ class TriggerRuleDep(BaseTIDep):
 IGNOREABLE = True
 IS_TASK_DEP = True
 
+@staticmethod
+def _get_states_count_upstream_ti(ti, finished_tasks):
+"""
+:param ti the ti that we want to calculate deps for
+:type ti airflow.models.TaskInstance
+:param finished_tasks all the finished tasks of the dag_run
 
 Review comment:
   ```suggestion
   :param finished_tasks: all the finished tasks of the dag_run
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run

2019-11-15 Thread GitBox
ashb commented on a change in pull request #4751: [AIRFLOW-3607] collected 
trigger rule dep check per dag run
URL: https://github.com/apache/airflow/pull/4751#discussion_r346965529
 
 

 ##
 File path: airflow/models/dagrun.py
 ##
 @@ -258,11 +258,13 @@ def get_previous_scheduled_dagrun(self, session=None):
 ).first()
 
 @provide_session
-def update_state(self, session=None):
+def update_state(self, session=None, finished_tasks=None):
 """
 Determines the overall state of the DagRun based on the state
 of its TaskInstances.
 
+:param finished_tasks: The finished tasks of this run
+:type finished_tasks: list of airflow.models.TaskInstance
 
 Review comment:
   ```suggestion
   :type finished_tasks: list[airflow.models.TaskInstance]
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run

2019-11-15 Thread GitBox
ashb commented on a change in pull request #4751: [AIRFLOW-3607] collected 
trigger rule dep check per dag run
URL: https://github.com/apache/airflow/pull/4751#discussion_r346965619
 
 

 ##
 File path: airflow/ti_deps/dep_context.py
 ##
 @@ -67,6 +67,8 @@ class DepContext:
 :type ignore_task_deps: bool
 :param ignore_ti_state: Ignore the task instance's previous failure/success
 :type ignore_ti_state: bool
+:param finished_tasks: A list of all the finished tasks of this run
+:type finished_tasks: list of airflow.models.TaskInstance
 
 Review comment:
   ```suggestion
   :type finished_tasks: list[airflow.models.TaskInstance]
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run

2019-11-11 Thread GitBox
ashb commented on a change in pull request #4751: [AIRFLOW-3607] collected 
trigger rule dep check per dag run
URL: https://github.com/apache/airflow/pull/4751#discussion_r344830363
 
 

 ##
 File path: airflow/utils/state.py
 ##
 @@ -99,6 +99,7 @@ def finished(cls):
 cls.SUCCESS,
 cls.FAILED,
 cls.SKIPPED,
+cls.UPSTREAM_FAILED
 
 Review comment:
   The doc for this list says:
   
   > A list of states indicating that a task started and completed a
   > run attempt.
   
   Tasks in upstream_failed fail at the first hurdle: they have never been 
started.
   
   Regardless of wether this change is right in the long run if falls way 
outside the title of the PR of "collect trigger rule dep check per dag run" to 
a behaviour change.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run

2019-11-05 Thread GitBox
ashb commented on a change in pull request #4751: [AIRFLOW-3607] collected 
trigger rule dep check per dag run
URL: https://github.com/apache/airflow/pull/4751#discussion_r342738653
 
 

 ##
 File path: airflow/utils/state.py
 ##
 @@ -99,6 +99,7 @@ def finished(cls):
 cls.SUCCESS,
 cls.FAILED,
 cls.SKIPPED,
+cls.UPSTREAM_FAILED
 
 Review comment:
   This is wrong. A task in UPSTREAM_FAILED has never been attempted.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services