uranusjr commented on code in PR #33903:
URL: https://github.com/apache/airflow/pull/33903#discussion_r1311209892


##########
airflow/ti_deps/deps/trigger_rule_dep.py:
##########
@@ -102,85 +102,247 @@ def _get_dep_statuses(
         if ti.task.trigger_rule == TR.ALWAYS:
             yield self._passing_status(reason="The task had a always trigger 
rule set.")
             return
+        if not ti.task.is_teardown:
+            # a teardown cannot have any indirect setups
+            relevant_setups = {t.task_id: t for t in 
ti.task.get_upstreams_only_setups()}
+            if relevant_setups:
+                for status, changed in self._evaluate_setup_constraint(
+                    ti=ti, relevant_setups=relevant_setups, 
dep_context=dep_context, session=session
+                ):
+                    yield status
+                    if not status.passed and changed:
+                        # no need to evaluate trigger rule; we've already 
marked as skipped or failed
+                        return
         yield from self._evaluate_trigger_rule(ti=ti, dep_context=dep_context, 
session=session)
 
-    def _evaluate_trigger_rule(
+    @staticmethod
+    @functools.lru_cache
+    def _get_relevant_upstream_map_indexes(*, ti: TaskInstance, upstream_task, 
session) -> int | range | None:

Review Comment:
   I wonder if we can actually cache this globally; a trigger rule is accessed 
repeatedly in the schedule in every loop iteration, and the result of this 
function may change depending on upstream states. We could add the cache 
decorator because the function was function-local previously, but now it’s on 
the class I’m not so sure.



##########
airflow/ti_deps/deps/trigger_rule_dep.py:
##########
@@ -102,85 +102,247 @@ def _get_dep_statuses(
         if ti.task.trigger_rule == TR.ALWAYS:
             yield self._passing_status(reason="The task had a always trigger 
rule set.")
             return
+        if not ti.task.is_teardown:
+            # a teardown cannot have any indirect setups
+            relevant_setups = {t.task_id: t for t in 
ti.task.get_upstreams_only_setups()}
+            if relevant_setups:
+                for status, changed in self._evaluate_setup_constraint(
+                    ti=ti, relevant_setups=relevant_setups, 
dep_context=dep_context, session=session
+                ):
+                    yield status
+                    if not status.passed and changed:
+                        # no need to evaluate trigger rule; we've already 
marked as skipped or failed
+                        return
         yield from self._evaluate_trigger_rule(ti=ti, dep_context=dep_context, 
session=session)
 
-    def _evaluate_trigger_rule(
+    @staticmethod
+    @functools.lru_cache
+    def _get_relevant_upstream_map_indexes(*, ti: TaskInstance, upstream_task, 
session) -> int | range | None:

Review Comment:
   I wonder if we can actually cache this globally; a trigger rule is accessed 
repeatedly in the schedule in every loop iteration, and the result of this 
function may change depending on upstream states. We could add the cache 
decorator because the function was function-local previously, but now it’s on 
the class I’m not so sure.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to