uranusjr commented on code in PR #33903: URL: https://github.com/apache/airflow/pull/33903#discussion_r1311209892
########## airflow/ti_deps/deps/trigger_rule_dep.py: ########## @@ -102,85 +102,247 @@ def _get_dep_statuses( if ti.task.trigger_rule == TR.ALWAYS: yield self._passing_status(reason="The task had a always trigger rule set.") return + if not ti.task.is_teardown: + # a teardown cannot have any indirect setups + relevant_setups = {t.task_id: t for t in ti.task.get_upstreams_only_setups()} + if relevant_setups: + for status, changed in self._evaluate_setup_constraint( + ti=ti, relevant_setups=relevant_setups, dep_context=dep_context, session=session + ): + yield status + if not status.passed and changed: + # no need to evaluate trigger rule; we've already marked as skipped or failed + return yield from self._evaluate_trigger_rule(ti=ti, dep_context=dep_context, session=session) - def _evaluate_trigger_rule( + @staticmethod + @functools.lru_cache + def _get_relevant_upstream_map_indexes(*, ti: TaskInstance, upstream_task, session) -> int | range | None: Review Comment: I wonder if we can actually cache this globally; a trigger rule is accessed repeatedly in the schedule in every loop iteration, and the result of this function may change depending on upstream states. We could add the cache decorator because the function was function-local previously, but now it’s on the class I’m not so sure. ########## airflow/ti_deps/deps/trigger_rule_dep.py: ########## @@ -102,85 +102,247 @@ def _get_dep_statuses( if ti.task.trigger_rule == TR.ALWAYS: yield self._passing_status(reason="The task had a always trigger rule set.") return + if not ti.task.is_teardown: + # a teardown cannot have any indirect setups + relevant_setups = {t.task_id: t for t in ti.task.get_upstreams_only_setups()} + if relevant_setups: + for status, changed in self._evaluate_setup_constraint( + ti=ti, relevant_setups=relevant_setups, dep_context=dep_context, session=session + ): + yield status + if not status.passed and changed: + # no need to evaluate trigger rule; we've already marked as skipped or failed + return yield from self._evaluate_trigger_rule(ti=ti, dep_context=dep_context, session=session) - def _evaluate_trigger_rule( + @staticmethod + @functools.lru_cache + def _get_relevant_upstream_map_indexes(*, ti: TaskInstance, upstream_task, session) -> int | range | None: Review Comment: I wonder if we can actually cache this globally; a trigger rule is accessed repeatedly in the schedule in every loop iteration, and the result of this function may change depending on upstream states. We could add the cache decorator because the function was function-local previously, but now it’s on the class I’m not so sure. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org