Re: [PR] [WIP] Implement short circuit operator [airflow]
shahar1 commented on code in PR #46584: URL: https://github.com/apache/airflow/pull/46584#discussion_r1966027686 ## task_sdk/src/airflow/sdk/definitions/baseoperator.py: ## @@ -1418,6 +1418,11 @@ def inherits_from_empty_operator(self): # of its subclasses (which don't inherit from anything but BaseOperator). return getattr(self, "_is_empty", False) +@property +def inherits_from_skip_mixin(self): +"""Used to determine if an Operator is inherited from SkipMixin.""" +return getattr(self, "_is_skip_mixin", False) Review Comment: Done, had to add equivalent definitions for the decorators (otherwise DAG proc. gets mad) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [WIP] Implement short circuit operator [airflow]
shahar1 commented on code in PR #46584: URL: https://github.com/apache/airflow/pull/46584#discussion_r1966025219 ## airflow/serialization/serialized_objects.py: ## @@ -1235,6 +1235,7 @@ def _serialize_node(cls, op: BaseOperator | MappedOperator) -> dict[str, Any]: # Used to determine if an Operator is inherited from EmptyOperator serialize_op["_is_empty"] = op.inherits_from_empty_operator +serialize_op["_is_skip_mixin"] = op.inherits_from_skip_mixin Review Comment: Your argument makes sense, let's settle for `is_skippable`? `is_conditional` sounds indeed too generic. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [WIP] Implement short circuit operator [airflow]
shahar1 commented on code in PR #46584: URL: https://github.com/apache/airflow/pull/46584#discussion_r1966013055 ## airflow/ti_deps/deps/not_previously_skipped_dep.py: ## @@ -49,12 +48,14 @@ def _get_dep_statuses(self, ti, session, dep_context): finished_task_ids = {t.task_id for t in finished_tis} for parent in upstream: -if isinstance(parent, SkipMixin): +if parent.inherits_from_skip_mixin is True: if parent.task_id not in finished_task_ids: # This can happen if the parent task has not yet run. continue -prev_result = ti.xcom_pull(task_ids=parent.task_id, key=XCOM_SKIPMIXIN_KEY, session=session) +# TODO: Use XCom.deserialize_value instead (requires some additional adjustments) +import json +prev_result = json.loads(ti.xcom_pull(task_ids=parent.task_id, key=XCOM_SKIPMIXIN_KEY, session=session) or "{}") Review Comment: It seems to work now, so I removed the `json.loads`. I added the TODO comment though. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [WIP] Implement short circuit operator [airflow]
shahar1 commented on code in PR #46584: URL: https://github.com/apache/airflow/pull/46584#discussion_r1965655841 ## newsfragments/aip-72.significant.rst: ## @@ -42,6 +42,9 @@ As part of this change the following breaking changes have occurred: It is recommended that you replace such a custom operator with a deferrable sensor, a condition or another triggering mechanism. +- The following operators subclassing ``SkipMixin`` have been adjusted to use the ``TaskExecution`` interface: + + - ``airflow.standard.python.ShortCircuitOperator`` Review Comment: Good call, I'll create a separate newsfragment at least for the part which is responsible for fixing short circuit in mapped tasks (https://github.com/apache/airflow/pull/44925) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [WIP] Implement short circuit operator [airflow]
ashb commented on code in PR #46584: URL: https://github.com/apache/airflow/pull/46584#discussion_r1952393955 ## newsfragments/aip-72.significant.rst: ## @@ -42,6 +42,9 @@ As part of this change the following breaking changes have occurred: It is recommended that you replace such a custom operator with a deferrable sensor, a condition or another triggering mechanism. +- The following operators subclassing ``SkipMixin`` have been adjusted to use the ``TaskExecution`` interface: + + - ``airflow.standard.python.ShortCircuitOperator`` Review Comment: Do users care about this? It doesn't actually change anything for them does it? ```suggestion ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [WIP] Implement short circuit operator [airflow]
ashb commented on code in PR #46584: URL: https://github.com/apache/airflow/pull/46584#discussion_r1952401465 ## airflow/models/skipmixin.py: ## @@ -120,23 +74,11 @@ def skip( return task_ids_list = [d.task_id for d in task_list] -SkipMixin._set_state_to_skipped(dag_id, run_id, task_ids_list, session) Review Comment: And yeah, I think we should do that. The TIDep skipping again is I think do catch it when things get cleared but the previous skip result is maintained -- see #7276, but we should still skip tasks immediately in the API server handling path. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [WIP] Implement short circuit operator [airflow]
ashb commented on code in PR #46584: URL: https://github.com/apache/airflow/pull/46584#discussion_r1952396335 ## task_sdk/src/airflow/sdk/definitions/baseoperator.py: ## @@ -1418,6 +1418,11 @@ def inherits_from_empty_operator(self): # of its subclasses (which don't inherit from anything but BaseOperator). return getattr(self, "_is_empty", False) +@property +def inherits_from_skip_mixin(self): +"""Used to determine if an Operator is inherited from SkipMixin.""" +return getattr(self, "_is_skip_mixin", False) Review Comment: If you put this on AbstractOperator then you don't need to change MappedOperator as much (just define the alias/attribute) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [WIP] Implement short circuit operator [airflow]
ashb commented on code in PR #46584: URL: https://github.com/apache/airflow/pull/46584#discussion_r1952390810 ## airflow/serialization/serialized_objects.py: ## @@ -1235,6 +1235,7 @@ def _serialize_node(cls, op: BaseOperator | MappedOperator) -> dict[str, Any]: # Used to determine if an Operator is inherited from EmptyOperator serialize_op["_is_empty"] = op.inherits_from_empty_operator +serialize_op["_is_skip_mixin"] = op.inherits_from_skip_mixin Review Comment: Looking at how it's used, `if parent.inherits_from_skip_mixin`, I think `if parent.is_conditional` would work there. That `is_conditional` name _might_ be a little too generic though. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [WIP] Implement short circuit operator [airflow]
ashb commented on code in PR #46584: URL: https://github.com/apache/airflow/pull/46584#discussion_r1952392759 ## airflow/ti_deps/deps/not_previously_skipped_dep.py: ## @@ -49,12 +48,14 @@ def _get_dep_statuses(self, ti, session, dep_context): finished_task_ids = {t.task_id for t in finished_tis} for parent in upstream: -if isinstance(parent, SkipMixin): +if parent.inherits_from_skip_mixin is True: if parent.task_id not in finished_task_ids: # This can happen if the parent task has not yet run. continue -prev_result = ti.xcom_pull(task_ids=parent.task_id, key=XCOM_SKIPMIXIN_KEY, session=session) +# TODO: Use XCom.deserialize_value instead (requires some additional adjustments) +import json +prev_result = json.loads(ti.xcom_pull(task_ids=parent.task_id, key=XCOM_SKIPMIXIN_KEY, session=session) or "{}") Review Comment: `xcom_pull` should already deserialize, but there's a bug where we are double encoding things. Please leave a todo comment linking to #45231 here. cc @amoghrajesh in case this gets merged before you fix that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [WIP] Implement short circuit operator [airflow]
ashb commented on code in PR #46584: URL: https://github.com/apache/airflow/pull/46584#discussion_r1952389770 ## airflow/ti_deps/deps/not_previously_skipped_dep.py: ## @@ -49,12 +48,14 @@ def _get_dep_statuses(self, ti, session, dep_context): finished_task_ids = {t.task_id for t in finished_tis} for parent in upstream: -if isinstance(parent, SkipMixin): +if parent.inherits_from_skip_mixin is True: Review Comment: Nit: ```suggestion if parent.inherits_from_skip_mixin ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [WIP] Implement short circuit operator [airflow]
ashb commented on code in PR #46584: URL: https://github.com/apache/airflow/pull/46584#discussion_r1952389279 ## airflow/serialization/serialized_objects.py: ## @@ -1235,6 +1235,7 @@ def _serialize_node(cls, op: BaseOperator | MappedOperator) -> dict[str, Any]: # Used to determine if an Operator is inherited from EmptyOperator serialize_op["_is_empty"] = op.inherits_from_empty_operator +serialize_op["_is_skip_mixin"] = op.inherits_from_skip_mixin Review Comment: I wonder if we could call this `_is_branching` or `_is_conditional` or something -- thinking ahead a bit if we have DAGs defined in other languages (be it YAML or anything else) I'm not sure naming it as skip_mixin makes sense anymore. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [WIP] Implement short circuit operator [airflow]
ashb commented on code in PR #46584: URL: https://github.com/apache/airflow/pull/46584#discussion_r1952387149 ## airflow/models/skipmixin.py: ## @@ -120,23 +74,11 @@ def skip( return task_ids_list = [d.task_id for d in task_list] -SkipMixin._set_state_to_skipped(dag_id, run_id, task_ids_list, session) Review Comment: The TaskSDK could make an API call dedicated to handling skipping if we want. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [WIP] Implement short circuit operator [airflow]
shahar1 commented on code in PR #46584: URL: https://github.com/apache/airflow/pull/46584#discussion_r1951582645 ## airflow/models/skipmixin.py: ## @@ -120,23 +74,11 @@ def skip( return task_ids_list = [d.task_id for d in task_list] -SkipMixin._set_state_to_skipped(dag_id, run_id, task_ids_list, session) Review Comment: I wonder if we even need to preserve this functionality - overall it looks like a short cut to set tasks to skipped before `NotPreviouslySkippedDep` handles it (breaks single responsibility?), and surely it won't work when there's no direct access to the DB from the operator. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [WIP] Implement short circuit operator [airflow]
shahar1 commented on code in PR #46584: URL: https://github.com/apache/airflow/pull/46584#discussion_r1951582645 ## airflow/models/skipmixin.py: ## @@ -120,23 +74,11 @@ def skip( return task_ids_list = [d.task_id for d in task_list] -SkipMixin._set_state_to_skipped(dag_id, run_id, task_ids_list, session) Review Comment: I wonder if we even need to preserve this functionality - it surely won't work when there's no direct access to the DB from the operator + overall it looks like a short cut to set tasks to skipped before `NotPreviouslySkippedDep` handles it (breaks single responsibility?). If we decide to preserve it, we'll need to signal the task runner with exceptions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [WIP] Implement short circuit operator [airflow]
shahar1 commented on code in PR #46584: URL: https://github.com/apache/airflow/pull/46584#discussion_r1951576521 ## airflow/models/skipmixin.py: ## @@ -120,8 +120,13 @@ def skip( return Review Comment: Irrelevant for now -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [WIP] Implement short circuit operator [airflow]
shahar1 commented on code in PR #46584: URL: https://github.com/apache/airflow/pull/46584#discussion_r1951575450 ## airflow/ti_deps/deps/not_previously_skipped_dep.py: ## @@ -49,46 +48,47 @@ def _get_dep_statuses(self, ti, session, dep_context): finished_task_ids = {t.task_id for t in finished_tis} for parent in upstream: -if isinstance(parent, SkipMixin): Review Comment: Implemented the flag as we discussed. When working on the tests I'll check that there are no issues with mapped operators. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [WIP] Implement short circuit operator [airflow]
shahar1 commented on code in PR #46584: URL: https://github.com/apache/airflow/pull/46584#discussion_r1951575706 ## task_sdk/src/airflow/sdk/execution_time/task_runner.py: ## @@ -564,7 +565,11 @@ def run(ti: RuntimeTaskInstance, log: Logger) -> ToSupervisor | None: msg = early_exit return msg -result = _execute_task(context, ti) + +try: Review Comment: Reverted -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [WIP] Implement short circuit operator [airflow]
shahar1 commented on code in PR #46584: URL: https://github.com/apache/airflow/pull/46584#discussion_r1951574379 ## airflow/exceptions.py: ## @@ -395,6 +395,19 @@ class ConnectionNotUnique(AirflowException): """Raise when multiple values are found for the same connection ID.""" +class SkipDownstreamTaskInstances(BaseException): Review Comment: Became irrelevant -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [WIP] Implement short circuit operator [airflow]
shahar1 commented on code in PR #46584: URL: https://github.com/apache/airflow/pull/46584#discussion_r1951574100 ## providers/common/compat/src/airflow/providers/common/compat/standard/short_circuit.py: ## @@ -0,0 +1,36 @@ +import logging + + +def get_tasks_to_skip(log, condition, task, downstream_task_ids, ignore_downstream_trigger_rules: bool): +""" +Compatibility function for short-circuiting tasks. +In Airflow v2, it will be utilized by the ShortCircuitOperator. +In Airflow v3+, it will be utilized by the task_runner. +""" Review Comment: Became irrelevant -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [WIP] Implement short circuit operator [airflow]
shahar1 commented on code in PR #46584: URL: https://github.com/apache/airflow/pull/46584#discussion_r1951573740 ## providers/standard/src/airflow/providers/standard/operators/python.py: ## @@ -272,48 +274,25 @@ def __init__(self, *, ignore_downstream_trigger_rules: bool = True, **kwargs) -> def execute(self, context: Context) -> Any: condition = super().execute(context) -self.log.info("Condition result is %s", condition) - -if condition: -self.log.info("Proceeding with downstream tasks...") -return condition - -if not self.downstream_task_ids: -self.log.info("No downstream tasks; nothing to do.") -return condition - -dag_run = context["dag_run"] - -def get_tasks_to_skip(): -if self.ignore_downstream_trigger_rules is True: -tasks = context["task"].get_flat_relatives(upstream=False) -else: -tasks = context["task"].get_direct_relatives(upstream=False) -for t in tasks: -if not t.is_teardown: -yield t - -to_skip = get_tasks_to_skip() - -# this let's us avoid an intermediate list unless debug logging -if self.log.getEffectiveLevel() <= logging.DEBUG: -self.log.debug("Downstream task IDs %s", to_skip := list(get_tasks_to_skip())) - -self.log.info("Skipping downstream tasks") if AIRFLOW_V_3_0_PLUS: -self.skip( -dag_id=dag_run.dag_id, -run_id=dag_run.run_id, -tasks=to_skip, -map_index=context["ti"].map_index, -) +raise SkipDownstreamTaskInstances(condition=condition, + ignore_downstream_trigger_rules=self.ignore_downstream_trigger_rules + ) Review Comment: Indeed it works fine, I reverted to the original state. I'll raise a discussion about it in the dev. list (and mayeb even the next dev. call), and we could all discuss it publicly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [WIP] Implement short circuit operator [airflow]
shahar1 commented on code in PR #46584: URL: https://github.com/apache/airflow/pull/46584#discussion_r1951451483 ## airflow/ti_deps/deps/not_previously_skipped_dep.py: ## @@ -49,46 +48,47 @@ def _get_dep_statuses(self, ti, session, dep_context): finished_task_ids = {t.task_id for t in finished_tis} for parent in upstream: -if isinstance(parent, SkipMixin): Review Comment: ~If we retain it - we also need to check whether it's a [mapped operator](https://github.com/apache/airflow/pull/44925#discussion_r1884624755), otherwise we have issues in that area~ Need to double check this statement -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [WIP] Implement short circuit operator [airflow]
shahar1 commented on code in PR #46584: URL: https://github.com/apache/airflow/pull/46584#discussion_r1951451483 ## airflow/ti_deps/deps/not_previously_skipped_dep.py: ## @@ -49,46 +48,47 @@ def _get_dep_statuses(self, ti, session, dep_context): finished_task_ids = {t.task_id for t in finished_tis} for parent in upstream: -if isinstance(parent, SkipMixin): Review Comment: If we retain it - we also need to check whether it's a [mapped operator](https://github.com/apache/airflow/pull/44925#discussion_r1884624755), otherwise we have issues in that area -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [WIP] Implement short circuit operator [airflow]
shahar1 commented on code in PR #46584: URL: https://github.com/apache/airflow/pull/46584#discussion_r1948851232 ## task_sdk/src/airflow/sdk/execution_time/task_runner.py: ## @@ -564,7 +565,11 @@ def run(ti: RuntimeTaskInstance, log: Logger) -> ToSupervisor | None: msg = early_exit return msg -result = _execute_task(context, ti) + +try: Review Comment: > Can we handle this at the same level as `except TaskDeferred as defer:` on L583? I agree that it looks a bit funny - I did it to avoid duplicating the logic below the nested `try...except`. I'll wrap it in another function, and then it might look a bit better. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [WIP] Implement short circuit operator [airflow]
ashb commented on code in PR #46584: URL: https://github.com/apache/airflow/pull/46584#discussion_r1948803054 ## task_sdk/src/airflow/sdk/execution_time/task_runner.py: ## @@ -564,7 +565,11 @@ def run(ti: RuntimeTaskInstance, log: Logger) -> ToSupervisor | None: msg = early_exit return msg -result = _execute_task(context, ti) + +try: Review Comment: Can we handle this at the same level as `except TaskDeferred as defer:` on L583? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [WIP] Implement short circuit operator [airflow]
ashb commented on code in PR #46584: URL: https://github.com/apache/airflow/pull/46584#discussion_r1948796811 ## airflow/exceptions.py: ## @@ -395,6 +395,19 @@ class ConnectionNotUnique(AirflowException): """Raise when multiple values are found for the same connection ID.""" +class SkipDownstreamTaskInstances(BaseException): Review Comment: Yes, may as well do that now since it's a new one. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [WIP] Implement short circuit operator [airflow]
ashb commented on code in PR #46584: URL: https://github.com/apache/airflow/pull/46584#discussion_r1948041123 ## task_sdk/src/airflow/sdk/execution_time/task_runner.py: ## @@ -564,7 +565,11 @@ def run(ti: RuntimeTaskInstance, log: Logger) -> ToSupervisor | None: msg = early_exit return msg -result = _execute_task(context, ti) + +try: Review Comment: This double/nested try-except looks weird -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [WIP] Implement short circuit operator [airflow]
ashb commented on code in PR #46584: URL: https://github.com/apache/airflow/pull/46584#discussion_r1948040857 ## providers/standard/src/airflow/providers/standard/operators/python.py: ## @@ -272,48 +274,25 @@ def __init__(self, *, ignore_downstream_trigger_rules: bool = True, **kwargs) -> def execute(self, context: Context) -> Any: condition = super().execute(context) -self.log.info("Condition result is %s", condition) - -if condition: -self.log.info("Proceeding with downstream tasks...") -return condition - -if not self.downstream_task_ids: -self.log.info("No downstream tasks; nothing to do.") -return condition - -dag_run = context["dag_run"] - -def get_tasks_to_skip(): -if self.ignore_downstream_trigger_rules is True: -tasks = context["task"].get_flat_relatives(upstream=False) -else: -tasks = context["task"].get_direct_relatives(upstream=False) -for t in tasks: -if not t.is_teardown: -yield t - -to_skip = get_tasks_to_skip() - -# this let's us avoid an intermediate list unless debug logging -if self.log.getEffectiveLevel() <= logging.DEBUG: -self.log.debug("Downstream task IDs %s", to_skip := list(get_tasks_to_skip())) - -self.log.info("Skipping downstream tasks") if AIRFLOW_V_3_0_PLUS: -self.skip( -dag_id=dag_run.dag_id, -run_id=dag_run.run_id, -tasks=to_skip, -map_index=context["ti"].map_index, -) +raise SkipDownstreamTaskInstances(condition=condition, + ignore_downstream_trigger_rules=self.ignore_downstream_trigger_rules + ) Review Comment: Moving it makes sense, but operators can set xcom still, `ti.xcom_push` works fine -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [WIP] Implement short circuit operator [airflow]
ashb commented on code in PR #46584: URL: https://github.com/apache/airflow/pull/46584#discussion_r1948040226 ## airflow/ti_deps/deps/not_previously_skipped_dep.py: ## @@ -49,46 +48,47 @@ def _get_dep_statuses(self, ti, session, dep_context): finished_task_ids = {t.task_id for t in finished_tis} for parent in upstream: -if isinstance(parent, SkipMixin): Review Comment: Removing this makes the scheduler have to check this for all operators where it didn't before. Given this is in the hottest of hot paths in the scheduler we might need to rethink this: perhaps we store a flag in the ser dag like we did for `is_empty` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [WIP] Implement short circuit operator [airflow]
shahar1 commented on code in PR #46584: URL: https://github.com/apache/airflow/pull/46584#discussion_r1947935272 ## providers/standard/src/airflow/providers/standard/operators/python.py: ## @@ -272,48 +274,25 @@ def __init__(self, *, ignore_downstream_trigger_rules: bool = True, **kwargs) -> def execute(self, context: Context) -> Any: condition = super().execute(context) -self.log.info("Condition result is %s", condition) - -if condition: -self.log.info("Proceeding with downstream tasks...") -return condition - -if not self.downstream_task_ids: -self.log.info("No downstream tasks; nothing to do.") -return condition - -dag_run = context["dag_run"] - -def get_tasks_to_skip(): -if self.ignore_downstream_trigger_rules is True: -tasks = context["task"].get_flat_relatives(upstream=False) -else: -tasks = context["task"].get_direct_relatives(upstream=False) -for t in tasks: -if not t.is_teardown: -yield t - -to_skip = get_tasks_to_skip() - -# this let's us avoid an intermediate list unless debug logging -if self.log.getEffectiveLevel() <= logging.DEBUG: -self.log.debug("Downstream task IDs %s", to_skip := list(get_tasks_to_skip())) - -self.log.info("Skipping downstream tasks") if AIRFLOW_V_3_0_PLUS: -self.skip( -dag_id=dag_run.dag_id, -run_id=dag_run.run_id, -tasks=to_skip, -map_index=context["ti"].map_index, -) +raise SkipDownstreamTaskInstances(condition=condition, + ignore_downstream_trigger_rules=self.ignore_downstream_trigger_rules + ) Review Comment: I think that it makes more sense in the new abstraction to handle the "skipping" logic in the task runner (or somewhere else in the task sdk?) rather than the standard operator itself, since the operator cannot set XComs directly. The operator runs some logic, which results in a boolean value. Upon signaling the task runner, the latter would handle setting the xcom with a list of all affected tasks. It already has all of the required information for that, except for the condition (which dictates whether to do it at all), and the param. `ignore_downstream_trigger_rules`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [WIP] Implement short circuit operator [airflow]
shahar1 commented on code in PR #46584: URL: https://github.com/apache/airflow/pull/46584#discussion_r1947935272 ## providers/standard/src/airflow/providers/standard/operators/python.py: ## @@ -272,48 +274,25 @@ def __init__(self, *, ignore_downstream_trigger_rules: bool = True, **kwargs) -> def execute(self, context: Context) -> Any: condition = super().execute(context) -self.log.info("Condition result is %s", condition) - -if condition: -self.log.info("Proceeding with downstream tasks...") -return condition - -if not self.downstream_task_ids: -self.log.info("No downstream tasks; nothing to do.") -return condition - -dag_run = context["dag_run"] - -def get_tasks_to_skip(): -if self.ignore_downstream_trigger_rules is True: -tasks = context["task"].get_flat_relatives(upstream=False) -else: -tasks = context["task"].get_direct_relatives(upstream=False) -for t in tasks: -if not t.is_teardown: -yield t - -to_skip = get_tasks_to_skip() - -# this let's us avoid an intermediate list unless debug logging -if self.log.getEffectiveLevel() <= logging.DEBUG: -self.log.debug("Downstream task IDs %s", to_skip := list(get_tasks_to_skip())) - -self.log.info("Skipping downstream tasks") if AIRFLOW_V_3_0_PLUS: -self.skip( -dag_id=dag_run.dag_id, -run_id=dag_run.run_id, -tasks=to_skip, -map_index=context["ti"].map_index, -) +raise SkipDownstreamTaskInstances(condition=condition, + ignore_downstream_trigger_rules=self.ignore_downstream_trigger_rules + ) Review Comment: I think that it makes more sense in the new abstraction to handle the "skipping" logic in the task runner rather than the standard operator itself, since the operator cannot set XComs directly. The operator runs some logic which results in a boolean value, and upon signaling the task runner - the latter would handle setting the xcom with a list of all affected tasks. It already has all of the required information for that, except for the condition (which dictates whether to do it at all), and the configruation `ignore_downstream_trigger_rules`. ## providers/standard/src/airflow/providers/standard/operators/python.py: ## @@ -272,48 +274,25 @@ def __init__(self, *, ignore_downstream_trigger_rules: bool = True, **kwargs) -> def execute(self, context: Context) -> Any: condition = super().execute(context) -self.log.info("Condition result is %s", condition) - -if condition: -self.log.info("Proceeding with downstream tasks...") -return condition - -if not self.downstream_task_ids: -self.log.info("No downstream tasks; nothing to do.") -return condition - -dag_run = context["dag_run"] - -def get_tasks_to_skip(): -if self.ignore_downstream_trigger_rules is True: -tasks = context["task"].get_flat_relatives(upstream=False) -else: -tasks = context["task"].get_direct_relatives(upstream=False) -for t in tasks: -if not t.is_teardown: -yield t - -to_skip = get_tasks_to_skip() - -# this let's us avoid an intermediate list unless debug logging -if self.log.getEffectiveLevel() <= logging.DEBUG: -self.log.debug("Downstream task IDs %s", to_skip := list(get_tasks_to_skip())) - -self.log.info("Skipping downstream tasks") if AIRFLOW_V_3_0_PLUS: -self.skip( -dag_id=dag_run.dag_id, -run_id=dag_run.run_id, -tasks=to_skip, -map_index=context["ti"].map_index, -) +raise SkipDownstreamTaskInstances(condition=condition, + ignore_downstream_trigger_rules=self.ignore_downstream_trigger_rules + ) Review Comment: I think that it makes more sense in the new abstraction to handle the "skipping" logic in the task runner rather than the standard operator itself, since the operator cannot set XComs directly. The operator runs some logic which results in a boolean value, and upon signaling the task runner - the latter would handle setting the xcom with a list of all affected tasks. It already has all of the required information for that, except for the condition (which dictates whether to do it at all), and the param. `ignore_downstream_trigger_rules`. -- This is an automated message from the Apache Git Service. To respond to the message
Re: [PR] [WIP] Implement short circuit operator [airflow]
shahar1 commented on code in PR #46584: URL: https://github.com/apache/airflow/pull/46584#discussion_r1947935668 ## providers/common/compat/src/airflow/providers/common/compat/standard/short_circuit.py: ## @@ -0,0 +1,36 @@ +import logging + + +def get_tasks_to_skip(log, condition, task, downstream_task_ids, ignore_downstream_trigger_rules: bool): +""" +Compatibility function for short-circuiting tasks. +In Airflow v2, it will be utilized by the ShortCircuitOperator. +In Airflow v3+, it will be utilized by the task_runner. +""" Review Comment: Important to preserve backward compatiblity, if my idea for moving the logic to the task runner is accepted -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [WIP] Implement short circuit operator [airflow]
shahar1 commented on code in PR #46584: URL: https://github.com/apache/airflow/pull/46584#discussion_r1947936403 ## newsfragments/44925.bugfix.rst: ## @@ -0,0 +1 @@ +Fix short circuit operator in mapped tasks Review Comment: I'll fix it later -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [WIP] Implement short circuit operator [airflow]
shahar1 commented on code in PR #46584: URL: https://github.com/apache/airflow/pull/46584#discussion_r1947936092 ## airflow/models/skipmixin.py: ## @@ -120,8 +120,13 @@ def skip( return Review Comment: If my suggestion for relocating the "skipping" logic to the task_runner makes sense, I think that both `SkipMixin` and `BranchMixin` will be eventually deleted (you could ignore the changes here, it's from #44925). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [WIP] Implement short circuit operator [airflow]
shahar1 commented on code in PR #46584: URL: https://github.com/apache/airflow/pull/46584#discussion_r1947936146 ## airflow/exceptions.py: ## @@ -395,6 +395,19 @@ class ConnectionNotUnique(AirflowException): """Raise when multiple values are found for the same connection ID.""" +class SkipDownstreamTaskInstances(BaseException): Review Comment: Should I relocate this exception to the SDK? (the other exceptions here are also used there) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [WIP] Implement short circuit operator [airflow]
shahar1 commented on code in PR #46584: URL: https://github.com/apache/airflow/pull/46584#discussion_r1947936092 ## airflow/models/skipmixin.py: ## @@ -120,8 +120,13 @@ def skip( return Review Comment: If my suggestion for relocating the "skipping" logic to the task_runner makes sense, I think that this Mixin will be eventually deleted (you could ignore the changes here, it's from #44925). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [WIP] Implement short circuit operator [airflow]
shahar1 commented on code in PR #46584: URL: https://github.com/apache/airflow/pull/46584#discussion_r1947935668 ## providers/common/compat/src/airflow/providers/common/compat/standard/short_circuit.py: ## @@ -0,0 +1,36 @@ +import logging + + +def get_tasks_to_skip(log, condition, task, downstream_task_ids, ignore_downstream_trigger_rules: bool): +""" +Compatibility function for short-circuiting tasks. +In Airflow v2, it will be utilized by the ShortCircuitOperator. +In Airflow v3+, it will be utilized by the task_runner. +""" Review Comment: Important to preserve backward compatiblity -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [WIP] Implement short circuit operator [airflow]
shahar1 commented on code in PR #46584: URL: https://github.com/apache/airflow/pull/46584#discussion_r1947935272 ## providers/standard/src/airflow/providers/standard/operators/python.py: ## @@ -272,48 +274,25 @@ def __init__(self, *, ignore_downstream_trigger_rules: bool = True, **kwargs) -> def execute(self, context: Context) -> Any: condition = super().execute(context) -self.log.info("Condition result is %s", condition) - -if condition: -self.log.info("Proceeding with downstream tasks...") -return condition - -if not self.downstream_task_ids: -self.log.info("No downstream tasks; nothing to do.") -return condition - -dag_run = context["dag_run"] - -def get_tasks_to_skip(): -if self.ignore_downstream_trigger_rules is True: -tasks = context["task"].get_flat_relatives(upstream=False) -else: -tasks = context["task"].get_direct_relatives(upstream=False) -for t in tasks: -if not t.is_teardown: -yield t - -to_skip = get_tasks_to_skip() - -# this let's us avoid an intermediate list unless debug logging -if self.log.getEffectiveLevel() <= logging.DEBUG: -self.log.debug("Downstream task IDs %s", to_skip := list(get_tasks_to_skip())) - -self.log.info("Skipping downstream tasks") if AIRFLOW_V_3_0_PLUS: -self.skip( -dag_id=dag_run.dag_id, -run_id=dag_run.run_id, -tasks=to_skip, -map_index=context["ti"].map_index, -) +raise SkipDownstreamTaskInstances(condition=condition, + ignore_downstream_trigger_rules=self.ignore_downstream_trigger_rules + ) Review Comment: I think that it makes more sense in the new abstraction to handle the "skipping" logic in the task runner rather than the standard operator itself, since the operator cannot set XComs directly. The operator runs some logic which results in a boolean value, and upon signaling the task runner - the latter would handle the setting the xcom with a list of all affected tasks (it already has all of the required information for that, except for the condition and the configruation). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [WIP] Implement short circuit operator [airflow]
shahar1 commented on code in PR #46584: URL: https://github.com/apache/airflow/pull/46584#discussion_r1947835723 ## task_sdk/src/airflow/sdk/execution_time/task_runner.py: ## @@ -564,11 +565,27 @@ def run(ti: RuntimeTaskInstance, log: Logger) -> ToSupervisor | None: msg = early_exit return msg -result = _execute_task(context, ti) +# TODO: looks a bit forced, couldn't come up with a better way that won't require a lot of changes Review Comment: If you have any ideas to implement it in a better way considering the existing limitations, I'd be happy for suggestions -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [WIP] Implement short circuit operator [airflow]
shahar1 commented on code in PR #46584: URL: https://github.com/apache/airflow/pull/46584#discussion_r1947835723 ## task_sdk/src/airflow/sdk/execution_time/task_runner.py: ## @@ -564,11 +565,27 @@ def run(ti: RuntimeTaskInstance, log: Logger) -> ToSupervisor | None: msg = early_exit return msg -result = _execute_task(context, ti) +# TODO: looks a bit forced, couldn't come up with a better way that won't require a lot of changes Review Comment: If you have any ideas to implement it in a better way considering the existing limitations, I'd be happy for suggestions -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org