Re: [PR] [WIP] Implement short circuit operator [airflow]

2025-02-21 Thread via GitHub


shahar1 commented on code in PR #46584:
URL: https://github.com/apache/airflow/pull/46584#discussion_r1966027686


##
task_sdk/src/airflow/sdk/definitions/baseoperator.py:
##
@@ -1418,6 +1418,11 @@ def inherits_from_empty_operator(self):
 # of its subclasses (which don't inherit from anything but 
BaseOperator).
 return getattr(self, "_is_empty", False)
 
+@property
+def inherits_from_skip_mixin(self):
+"""Used to determine if an Operator is inherited from SkipMixin."""
+return getattr(self, "_is_skip_mixin", False)

Review Comment:
   Done, had to add equivalent definitions for the decorators (otherwise DAG 
proc. gets mad)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [WIP] Implement short circuit operator [airflow]

2025-02-21 Thread via GitHub


shahar1 commented on code in PR #46584:
URL: https://github.com/apache/airflow/pull/46584#discussion_r1966025219


##
airflow/serialization/serialized_objects.py:
##
@@ -1235,6 +1235,7 @@ def _serialize_node(cls, op: BaseOperator | 
MappedOperator) -> dict[str, Any]:
 
 # Used to determine if an Operator is inherited from EmptyOperator
 serialize_op["_is_empty"] = op.inherits_from_empty_operator
+serialize_op["_is_skip_mixin"] = op.inherits_from_skip_mixin

Review Comment:
   Your argument makes sense, let's settle for `is_skippable`?
   `is_conditional` sounds indeed too generic.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [WIP] Implement short circuit operator [airflow]

2025-02-21 Thread via GitHub


shahar1 commented on code in PR #46584:
URL: https://github.com/apache/airflow/pull/46584#discussion_r1966013055


##
airflow/ti_deps/deps/not_previously_skipped_dep.py:
##
@@ -49,12 +48,14 @@ def _get_dep_statuses(self, ti, session, dep_context):
 finished_task_ids = {t.task_id for t in finished_tis}
 
 for parent in upstream:
-if isinstance(parent, SkipMixin):
+if parent.inherits_from_skip_mixin is True:
 if parent.task_id not in finished_task_ids:
 # This can happen if the parent task has not yet run.
 continue
 
-prev_result = ti.xcom_pull(task_ids=parent.task_id, 
key=XCOM_SKIPMIXIN_KEY, session=session)
+# TODO: Use XCom.deserialize_value instead (requires some 
additional adjustments)
+import json
+prev_result = json.loads(ti.xcom_pull(task_ids=parent.task_id, 
key=XCOM_SKIPMIXIN_KEY, session=session) or "{}")

Review Comment:
   It seems to work now, so I removed the `json.loads`.
   I added the TODO comment though.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [WIP] Implement short circuit operator [airflow]

2025-02-21 Thread via GitHub


shahar1 commented on code in PR #46584:
URL: https://github.com/apache/airflow/pull/46584#discussion_r1965655841


##
newsfragments/aip-72.significant.rst:
##
@@ -42,6 +42,9 @@ As part of this change the following breaking changes have 
occurred:
 
   It is recommended that you replace such a custom operator with a deferrable 
sensor, a condition or another triggering mechanism.
 
+- The following operators subclassing ``SkipMixin`` have been adjusted to use 
the ``TaskExecution`` interface:
+
+  - ``airflow.standard.python.ShortCircuitOperator``

Review Comment:
   Good call, I'll create a separate newsfragment at least for the part which 
is responsible for fixing short circuit in mapped tasks 
(https://github.com/apache/airflow/pull/44925)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [WIP] Implement short circuit operator [airflow]

2025-02-12 Thread via GitHub


ashb commented on code in PR #46584:
URL: https://github.com/apache/airflow/pull/46584#discussion_r1952393955


##
newsfragments/aip-72.significant.rst:
##
@@ -42,6 +42,9 @@ As part of this change the following breaking changes have 
occurred:
 
   It is recommended that you replace such a custom operator with a deferrable 
sensor, a condition or another triggering mechanism.
 
+- The following operators subclassing ``SkipMixin`` have been adjusted to use 
the ``TaskExecution`` interface:
+
+  - ``airflow.standard.python.ShortCircuitOperator``

Review Comment:
   Do users care about this? It doesn't actually change anything for them does 
it?
   
   ```suggestion
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [WIP] Implement short circuit operator [airflow]

2025-02-12 Thread via GitHub


ashb commented on code in PR #46584:
URL: https://github.com/apache/airflow/pull/46584#discussion_r1952401465


##
airflow/models/skipmixin.py:
##
@@ -120,23 +74,11 @@ def skip(
 return
 
 task_ids_list = [d.task_id for d in task_list]
-SkipMixin._set_state_to_skipped(dag_id, run_id, task_ids_list, session)

Review Comment:
   And yeah, I think we should do that. The TIDep skipping again is I think do 
catch it when things get cleared but the previous skip result is maintained -- 
see #7276, but we should still skip tasks immediately in the API server 
handling path.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [WIP] Implement short circuit operator [airflow]

2025-02-12 Thread via GitHub


ashb commented on code in PR #46584:
URL: https://github.com/apache/airflow/pull/46584#discussion_r1952396335


##
task_sdk/src/airflow/sdk/definitions/baseoperator.py:
##
@@ -1418,6 +1418,11 @@ def inherits_from_empty_operator(self):
 # of its subclasses (which don't inherit from anything but 
BaseOperator).
 return getattr(self, "_is_empty", False)
 
+@property
+def inherits_from_skip_mixin(self):
+"""Used to determine if an Operator is inherited from SkipMixin."""
+return getattr(self, "_is_skip_mixin", False)

Review Comment:
   If you put this on AbstractOperator then you don't need to change 
MappedOperator as much (just define the alias/attribute)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [WIP] Implement short circuit operator [airflow]

2025-02-12 Thread via GitHub


ashb commented on code in PR #46584:
URL: https://github.com/apache/airflow/pull/46584#discussion_r1952390810


##
airflow/serialization/serialized_objects.py:
##
@@ -1235,6 +1235,7 @@ def _serialize_node(cls, op: BaseOperator | 
MappedOperator) -> dict[str, Any]:
 
 # Used to determine if an Operator is inherited from EmptyOperator
 serialize_op["_is_empty"] = op.inherits_from_empty_operator
+serialize_op["_is_skip_mixin"] = op.inherits_from_skip_mixin

Review Comment:
   Looking at how it's used, `if parent.inherits_from_skip_mixin`, I think `if 
parent.is_conditional` would work there. That `is_conditional` name _might_ be 
a little too generic though.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [WIP] Implement short circuit operator [airflow]

2025-02-12 Thread via GitHub


ashb commented on code in PR #46584:
URL: https://github.com/apache/airflow/pull/46584#discussion_r1952392759


##
airflow/ti_deps/deps/not_previously_skipped_dep.py:
##
@@ -49,12 +48,14 @@ def _get_dep_statuses(self, ti, session, dep_context):
 finished_task_ids = {t.task_id for t in finished_tis}
 
 for parent in upstream:
-if isinstance(parent, SkipMixin):
+if parent.inherits_from_skip_mixin is True:
 if parent.task_id not in finished_task_ids:
 # This can happen if the parent task has not yet run.
 continue
 
-prev_result = ti.xcom_pull(task_ids=parent.task_id, 
key=XCOM_SKIPMIXIN_KEY, session=session)
+# TODO: Use XCom.deserialize_value instead (requires some 
additional adjustments)
+import json
+prev_result = json.loads(ti.xcom_pull(task_ids=parent.task_id, 
key=XCOM_SKIPMIXIN_KEY, session=session) or "{}")

Review Comment:
   `xcom_pull` should already deserialize, but there's a bug where we are 
double encoding things. Please leave a todo comment linking to #45231 here.
   
   cc @amoghrajesh in case this gets merged before you fix that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [WIP] Implement short circuit operator [airflow]

2025-02-12 Thread via GitHub


ashb commented on code in PR #46584:
URL: https://github.com/apache/airflow/pull/46584#discussion_r1952389770


##
airflow/ti_deps/deps/not_previously_skipped_dep.py:
##
@@ -49,12 +48,14 @@ def _get_dep_statuses(self, ti, session, dep_context):
 finished_task_ids = {t.task_id for t in finished_tis}
 
 for parent in upstream:
-if isinstance(parent, SkipMixin):
+if parent.inherits_from_skip_mixin is True:

Review Comment:
   Nit:
   ```suggestion
   if parent.inherits_from_skip_mixin
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [WIP] Implement short circuit operator [airflow]

2025-02-12 Thread via GitHub


ashb commented on code in PR #46584:
URL: https://github.com/apache/airflow/pull/46584#discussion_r1952389279


##
airflow/serialization/serialized_objects.py:
##
@@ -1235,6 +1235,7 @@ def _serialize_node(cls, op: BaseOperator | 
MappedOperator) -> dict[str, Any]:
 
 # Used to determine if an Operator is inherited from EmptyOperator
 serialize_op["_is_empty"] = op.inherits_from_empty_operator
+serialize_op["_is_skip_mixin"] = op.inherits_from_skip_mixin

Review Comment:
   I wonder if we could call this `_is_branching` or `_is_conditional` or 
something -- thinking ahead a bit if we have DAGs defined in other languages 
(be it YAML or anything else) I'm not sure naming it as skip_mixin makes sense 
anymore.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [WIP] Implement short circuit operator [airflow]

2025-02-12 Thread via GitHub


ashb commented on code in PR #46584:
URL: https://github.com/apache/airflow/pull/46584#discussion_r1952387149


##
airflow/models/skipmixin.py:
##
@@ -120,23 +74,11 @@ def skip(
 return
 
 task_ids_list = [d.task_id for d in task_list]
-SkipMixin._set_state_to_skipped(dag_id, run_id, task_ids_list, session)

Review Comment:
   The TaskSDK could make an API call dedicated to handling skipping if we want.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [WIP] Implement short circuit operator [airflow]

2025-02-11 Thread via GitHub


shahar1 commented on code in PR #46584:
URL: https://github.com/apache/airflow/pull/46584#discussion_r1951582645


##
airflow/models/skipmixin.py:
##
@@ -120,23 +74,11 @@ def skip(
 return
 
 task_ids_list = [d.task_id for d in task_list]
-SkipMixin._set_state_to_skipped(dag_id, run_id, task_ids_list, session)

Review Comment:
   I wonder if we even need to preserve this functionality - overall it looks 
like a short cut to set tasks to skipped before `NotPreviouslySkippedDep` 
handles it (breaks single responsibility?), and surely it won't work when 
there's no direct access to the DB from the operator.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [WIP] Implement short circuit operator [airflow]

2025-02-11 Thread via GitHub


shahar1 commented on code in PR #46584:
URL: https://github.com/apache/airflow/pull/46584#discussion_r1951582645


##
airflow/models/skipmixin.py:
##
@@ -120,23 +74,11 @@ def skip(
 return
 
 task_ids_list = [d.task_id for d in task_list]
-SkipMixin._set_state_to_skipped(dag_id, run_id, task_ids_list, session)

Review Comment:
   I wonder if we even need to preserve this functionality - it surely won't 
work when there's no direct access to the DB from the operator  + overall it 
looks like a short cut to set tasks to skipped before `NotPreviouslySkippedDep` 
handles it (breaks single responsibility?).
   If we decide to preserve it, we'll need to signal the task runner with 
exceptions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [WIP] Implement short circuit operator [airflow]

2025-02-11 Thread via GitHub


shahar1 commented on code in PR #46584:
URL: https://github.com/apache/airflow/pull/46584#discussion_r1951576521


##
airflow/models/skipmixin.py:
##
@@ -120,8 +120,13 @@ def skip(
 return

Review Comment:
   Irrelevant for now



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [WIP] Implement short circuit operator [airflow]

2025-02-11 Thread via GitHub


shahar1 commented on code in PR #46584:
URL: https://github.com/apache/airflow/pull/46584#discussion_r1951575450


##
airflow/ti_deps/deps/not_previously_skipped_dep.py:
##
@@ -49,46 +48,47 @@ def _get_dep_statuses(self, ti, session, dep_context):
 finished_task_ids = {t.task_id for t in finished_tis}
 
 for parent in upstream:
-if isinstance(parent, SkipMixin):

Review Comment:
   Implemented the flag as we discussed.
   When working on the tests I'll check that there are no issues with mapped 
operators.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [WIP] Implement short circuit operator [airflow]

2025-02-11 Thread via GitHub


shahar1 commented on code in PR #46584:
URL: https://github.com/apache/airflow/pull/46584#discussion_r1951575706


##
task_sdk/src/airflow/sdk/execution_time/task_runner.py:
##
@@ -564,7 +565,11 @@ def run(ti: RuntimeTaskInstance, log: Logger) -> 
ToSupervisor | None:
 msg = early_exit
 return msg
 
-result = _execute_task(context, ti)
+
+try:

Review Comment:
   Reverted



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [WIP] Implement short circuit operator [airflow]

2025-02-11 Thread via GitHub


shahar1 commented on code in PR #46584:
URL: https://github.com/apache/airflow/pull/46584#discussion_r1951574379


##
airflow/exceptions.py:
##
@@ -395,6 +395,19 @@ class ConnectionNotUnique(AirflowException):
 """Raise when multiple values are found for the same connection ID."""
 
 
+class SkipDownstreamTaskInstances(BaseException):

Review Comment:
   Became irrelevant



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [WIP] Implement short circuit operator [airflow]

2025-02-11 Thread via GitHub


shahar1 commented on code in PR #46584:
URL: https://github.com/apache/airflow/pull/46584#discussion_r1951574100


##
providers/common/compat/src/airflow/providers/common/compat/standard/short_circuit.py:
##
@@ -0,0 +1,36 @@
+import logging
+
+
+def get_tasks_to_skip(log, condition, task, downstream_task_ids, 
ignore_downstream_trigger_rules: bool):
+"""
+Compatibility function for short-circuiting tasks.
+In Airflow v2, it will be utilized by the ShortCircuitOperator.
+In Airflow v3+, it will be utilized by the task_runner.
+"""

Review Comment:
   Became irrelevant



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [WIP] Implement short circuit operator [airflow]

2025-02-11 Thread via GitHub


shahar1 commented on code in PR #46584:
URL: https://github.com/apache/airflow/pull/46584#discussion_r1951573740


##
providers/standard/src/airflow/providers/standard/operators/python.py:
##
@@ -272,48 +274,25 @@ def __init__(self, *, ignore_downstream_trigger_rules: 
bool = True, **kwargs) ->
 
 def execute(self, context: Context) -> Any:
 condition = super().execute(context)
-self.log.info("Condition result is %s", condition)
-
-if condition:
-self.log.info("Proceeding with downstream tasks...")
-return condition
-
-if not self.downstream_task_ids:
-self.log.info("No downstream tasks; nothing to do.")
-return condition
-
-dag_run = context["dag_run"]
-
-def get_tasks_to_skip():
-if self.ignore_downstream_trigger_rules is True:
-tasks = context["task"].get_flat_relatives(upstream=False)
-else:
-tasks = context["task"].get_direct_relatives(upstream=False)
-for t in tasks:
-if not t.is_teardown:
-yield t
-
-to_skip = get_tasks_to_skip()
-
-# this let's us avoid an intermediate list unless debug logging
-if self.log.getEffectiveLevel() <= logging.DEBUG:
-self.log.debug("Downstream task IDs %s", to_skip := 
list(get_tasks_to_skip()))
-
-self.log.info("Skipping downstream tasks")
 if AIRFLOW_V_3_0_PLUS:
-self.skip(
-dag_id=dag_run.dag_id,
-run_id=dag_run.run_id,
-tasks=to_skip,
-map_index=context["ti"].map_index,
-)
+raise SkipDownstreamTaskInstances(condition=condition,
+  
ignore_downstream_trigger_rules=self.ignore_downstream_trigger_rules
+  )

Review Comment:
   Indeed it works fine, I reverted to the original state.
   I'll raise a discussion about it in the dev. list (and mayeb even the next 
dev. call), and we could all discuss it publicly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [WIP] Implement short circuit operator [airflow]

2025-02-11 Thread via GitHub


shahar1 commented on code in PR #46584:
URL: https://github.com/apache/airflow/pull/46584#discussion_r1951451483


##
airflow/ti_deps/deps/not_previously_skipped_dep.py:
##
@@ -49,46 +48,47 @@ def _get_dep_statuses(self, ti, session, dep_context):
 finished_task_ids = {t.task_id for t in finished_tis}
 
 for parent in upstream:
-if isinstance(parent, SkipMixin):

Review Comment:
   ~If we retain it - we also need to check whether it's a [mapped 
operator](https://github.com/apache/airflow/pull/44925#discussion_r1884624755), 
otherwise we have issues in that area~
   
   Need to double check this statement



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [WIP] Implement short circuit operator [airflow]

2025-02-11 Thread via GitHub


shahar1 commented on code in PR #46584:
URL: https://github.com/apache/airflow/pull/46584#discussion_r1951451483


##
airflow/ti_deps/deps/not_previously_skipped_dep.py:
##
@@ -49,46 +48,47 @@ def _get_dep_statuses(self, ti, session, dep_context):
 finished_task_ids = {t.task_id for t in finished_tis}
 
 for parent in upstream:
-if isinstance(parent, SkipMixin):

Review Comment:
   If we retain it - we also need to check whether it's a [mapped 
operator](https://github.com/apache/airflow/pull/44925#discussion_r1884624755), 
otherwise we have issues in that area



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [WIP] Implement short circuit operator [airflow]

2025-02-10 Thread via GitHub


shahar1 commented on code in PR #46584:
URL: https://github.com/apache/airflow/pull/46584#discussion_r1948851232


##
task_sdk/src/airflow/sdk/execution_time/task_runner.py:
##
@@ -564,7 +565,11 @@ def run(ti: RuntimeTaskInstance, log: Logger) -> 
ToSupervisor | None:
 msg = early_exit
 return msg
 
-result = _execute_task(context, ti)
+
+try:

Review Comment:
   > Can we handle this at the same level as `except TaskDeferred as defer:` on 
L583?
   
   I agree that it looks a bit funny - I did it to avoid duplicating the logic 
below the nested `try...except`. I'll wrap it in another function, and then it 
might look a bit better.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [WIP] Implement short circuit operator [airflow]

2025-02-10 Thread via GitHub


ashb commented on code in PR #46584:
URL: https://github.com/apache/airflow/pull/46584#discussion_r1948803054


##
task_sdk/src/airflow/sdk/execution_time/task_runner.py:
##
@@ -564,7 +565,11 @@ def run(ti: RuntimeTaskInstance, log: Logger) -> 
ToSupervisor | None:
 msg = early_exit
 return msg
 
-result = _execute_task(context, ti)
+
+try:

Review Comment:
   Can we handle this at the same level as `except TaskDeferred as defer:` on 
L583?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [WIP] Implement short circuit operator [airflow]

2025-02-10 Thread via GitHub


ashb commented on code in PR #46584:
URL: https://github.com/apache/airflow/pull/46584#discussion_r1948796811


##
airflow/exceptions.py:
##
@@ -395,6 +395,19 @@ class ConnectionNotUnique(AirflowException):
 """Raise when multiple values are found for the same connection ID."""
 
 
+class SkipDownstreamTaskInstances(BaseException):

Review Comment:
   Yes, may as well do that now since it's a new one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [WIP] Implement short circuit operator [airflow]

2025-02-09 Thread via GitHub


ashb commented on code in PR #46584:
URL: https://github.com/apache/airflow/pull/46584#discussion_r1948041123


##
task_sdk/src/airflow/sdk/execution_time/task_runner.py:
##
@@ -564,7 +565,11 @@ def run(ti: RuntimeTaskInstance, log: Logger) -> 
ToSupervisor | None:
 msg = early_exit
 return msg
 
-result = _execute_task(context, ti)
+
+try:

Review Comment:
   This double/nested try-except looks weird 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [WIP] Implement short circuit operator [airflow]

2025-02-09 Thread via GitHub


ashb commented on code in PR #46584:
URL: https://github.com/apache/airflow/pull/46584#discussion_r1948040857


##
providers/standard/src/airflow/providers/standard/operators/python.py:
##
@@ -272,48 +274,25 @@ def __init__(self, *, ignore_downstream_trigger_rules: 
bool = True, **kwargs) ->
 
 def execute(self, context: Context) -> Any:
 condition = super().execute(context)
-self.log.info("Condition result is %s", condition)
-
-if condition:
-self.log.info("Proceeding with downstream tasks...")
-return condition
-
-if not self.downstream_task_ids:
-self.log.info("No downstream tasks; nothing to do.")
-return condition
-
-dag_run = context["dag_run"]
-
-def get_tasks_to_skip():
-if self.ignore_downstream_trigger_rules is True:
-tasks = context["task"].get_flat_relatives(upstream=False)
-else:
-tasks = context["task"].get_direct_relatives(upstream=False)
-for t in tasks:
-if not t.is_teardown:
-yield t
-
-to_skip = get_tasks_to_skip()
-
-# this let's us avoid an intermediate list unless debug logging
-if self.log.getEffectiveLevel() <= logging.DEBUG:
-self.log.debug("Downstream task IDs %s", to_skip := 
list(get_tasks_to_skip()))
-
-self.log.info("Skipping downstream tasks")
 if AIRFLOW_V_3_0_PLUS:
-self.skip(
-dag_id=dag_run.dag_id,
-run_id=dag_run.run_id,
-tasks=to_skip,
-map_index=context["ti"].map_index,
-)
+raise SkipDownstreamTaskInstances(condition=condition,
+  
ignore_downstream_trigger_rules=self.ignore_downstream_trigger_rules
+  )

Review Comment:
   Moving it makes sense, but operators can set xcom still, `ti.xcom_push` 
works fine



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [WIP] Implement short circuit operator [airflow]

2025-02-09 Thread via GitHub


ashb commented on code in PR #46584:
URL: https://github.com/apache/airflow/pull/46584#discussion_r1948040226


##
airflow/ti_deps/deps/not_previously_skipped_dep.py:
##
@@ -49,46 +48,47 @@ def _get_dep_statuses(self, ti, session, dep_context):
 finished_task_ids = {t.task_id for t in finished_tis}
 
 for parent in upstream:
-if isinstance(parent, SkipMixin):

Review Comment:
   Removing this makes the scheduler have to check this for all operators where 
it didn't before. 
   
   Given this is in the hottest of hot paths in the scheduler we might need to 
rethink this: perhaps we store a flag in the ser dag like we did for `is_empty`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [WIP] Implement short circuit operator [airflow]

2025-02-08 Thread via GitHub


shahar1 commented on code in PR #46584:
URL: https://github.com/apache/airflow/pull/46584#discussion_r1947935272


##
providers/standard/src/airflow/providers/standard/operators/python.py:
##
@@ -272,48 +274,25 @@ def __init__(self, *, ignore_downstream_trigger_rules: 
bool = True, **kwargs) ->
 
 def execute(self, context: Context) -> Any:
 condition = super().execute(context)
-self.log.info("Condition result is %s", condition)
-
-if condition:
-self.log.info("Proceeding with downstream tasks...")
-return condition
-
-if not self.downstream_task_ids:
-self.log.info("No downstream tasks; nothing to do.")
-return condition
-
-dag_run = context["dag_run"]
-
-def get_tasks_to_skip():
-if self.ignore_downstream_trigger_rules is True:
-tasks = context["task"].get_flat_relatives(upstream=False)
-else:
-tasks = context["task"].get_direct_relatives(upstream=False)
-for t in tasks:
-if not t.is_teardown:
-yield t
-
-to_skip = get_tasks_to_skip()
-
-# this let's us avoid an intermediate list unless debug logging
-if self.log.getEffectiveLevel() <= logging.DEBUG:
-self.log.debug("Downstream task IDs %s", to_skip := 
list(get_tasks_to_skip()))
-
-self.log.info("Skipping downstream tasks")
 if AIRFLOW_V_3_0_PLUS:
-self.skip(
-dag_id=dag_run.dag_id,
-run_id=dag_run.run_id,
-tasks=to_skip,
-map_index=context["ti"].map_index,
-)
+raise SkipDownstreamTaskInstances(condition=condition,
+  
ignore_downstream_trigger_rules=self.ignore_downstream_trigger_rules
+  )

Review Comment:
   I think that it makes more sense in the new abstraction to handle the 
"skipping" logic in the task runner (or somewhere else in the task sdk?) rather 
than the standard operator itself, since the operator cannot set XComs directly.
   
   The operator runs some logic, which results in a boolean value. Upon 
signaling the task runner, the latter would handle setting the xcom with a list 
of all affected tasks. It already has all of the required information for that, 
except for the condition (which dictates whether to do it at all), and the 
param. `ignore_downstream_trigger_rules`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [WIP] Implement short circuit operator [airflow]

2025-02-08 Thread via GitHub


shahar1 commented on code in PR #46584:
URL: https://github.com/apache/airflow/pull/46584#discussion_r1947935272


##
providers/standard/src/airflow/providers/standard/operators/python.py:
##
@@ -272,48 +274,25 @@ def __init__(self, *, ignore_downstream_trigger_rules: 
bool = True, **kwargs) ->
 
 def execute(self, context: Context) -> Any:
 condition = super().execute(context)
-self.log.info("Condition result is %s", condition)
-
-if condition:
-self.log.info("Proceeding with downstream tasks...")
-return condition
-
-if not self.downstream_task_ids:
-self.log.info("No downstream tasks; nothing to do.")
-return condition
-
-dag_run = context["dag_run"]
-
-def get_tasks_to_skip():
-if self.ignore_downstream_trigger_rules is True:
-tasks = context["task"].get_flat_relatives(upstream=False)
-else:
-tasks = context["task"].get_direct_relatives(upstream=False)
-for t in tasks:
-if not t.is_teardown:
-yield t
-
-to_skip = get_tasks_to_skip()
-
-# this let's us avoid an intermediate list unless debug logging
-if self.log.getEffectiveLevel() <= logging.DEBUG:
-self.log.debug("Downstream task IDs %s", to_skip := 
list(get_tasks_to_skip()))
-
-self.log.info("Skipping downstream tasks")
 if AIRFLOW_V_3_0_PLUS:
-self.skip(
-dag_id=dag_run.dag_id,
-run_id=dag_run.run_id,
-tasks=to_skip,
-map_index=context["ti"].map_index,
-)
+raise SkipDownstreamTaskInstances(condition=condition,
+  
ignore_downstream_trigger_rules=self.ignore_downstream_trigger_rules
+  )

Review Comment:
   I think that it makes more sense in the new abstraction to handle the 
"skipping" logic in the task runner rather than the standard operator itself, 
since the operator cannot set XComs directly.
   
   The operator runs some logic which results in a boolean value, and upon 
signaling the task runner - the latter would handle setting the xcom with a 
list of all affected tasks. It already has all of the required information for 
that, except for the condition (which dictates whether to do it at all), and 
the configruation `ignore_downstream_trigger_rules`.



##
providers/standard/src/airflow/providers/standard/operators/python.py:
##
@@ -272,48 +274,25 @@ def __init__(self, *, ignore_downstream_trigger_rules: 
bool = True, **kwargs) ->
 
 def execute(self, context: Context) -> Any:
 condition = super().execute(context)
-self.log.info("Condition result is %s", condition)
-
-if condition:
-self.log.info("Proceeding with downstream tasks...")
-return condition
-
-if not self.downstream_task_ids:
-self.log.info("No downstream tasks; nothing to do.")
-return condition
-
-dag_run = context["dag_run"]
-
-def get_tasks_to_skip():
-if self.ignore_downstream_trigger_rules is True:
-tasks = context["task"].get_flat_relatives(upstream=False)
-else:
-tasks = context["task"].get_direct_relatives(upstream=False)
-for t in tasks:
-if not t.is_teardown:
-yield t
-
-to_skip = get_tasks_to_skip()
-
-# this let's us avoid an intermediate list unless debug logging
-if self.log.getEffectiveLevel() <= logging.DEBUG:
-self.log.debug("Downstream task IDs %s", to_skip := 
list(get_tasks_to_skip()))
-
-self.log.info("Skipping downstream tasks")
 if AIRFLOW_V_3_0_PLUS:
-self.skip(
-dag_id=dag_run.dag_id,
-run_id=dag_run.run_id,
-tasks=to_skip,
-map_index=context["ti"].map_index,
-)
+raise SkipDownstreamTaskInstances(condition=condition,
+  
ignore_downstream_trigger_rules=self.ignore_downstream_trigger_rules
+  )

Review Comment:
   I think that it makes more sense in the new abstraction to handle the 
"skipping" logic in the task runner rather than the standard operator itself, 
since the operator cannot set XComs directly.
   
   The operator runs some logic which results in a boolean value, and upon 
signaling the task runner - the latter would handle setting the xcom with a 
list of all affected tasks. It already has all of the required information for 
that, except for the condition (which dictates whether to do it at all), and 
the param. `ignore_downstream_trigger_rules`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message

Re: [PR] [WIP] Implement short circuit operator [airflow]

2025-02-08 Thread via GitHub


shahar1 commented on code in PR #46584:
URL: https://github.com/apache/airflow/pull/46584#discussion_r1947935668


##
providers/common/compat/src/airflow/providers/common/compat/standard/short_circuit.py:
##
@@ -0,0 +1,36 @@
+import logging
+
+
+def get_tasks_to_skip(log, condition, task, downstream_task_ids, 
ignore_downstream_trigger_rules: bool):
+"""
+Compatibility function for short-circuiting tasks.
+In Airflow v2, it will be utilized by the ShortCircuitOperator.
+In Airflow v3+, it will be utilized by the task_runner.
+"""

Review Comment:
   Important to preserve backward compatiblity, if my idea for moving the logic 
to the task runner is accepted



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [WIP] Implement short circuit operator [airflow]

2025-02-08 Thread via GitHub


shahar1 commented on code in PR #46584:
URL: https://github.com/apache/airflow/pull/46584#discussion_r1947936403


##
newsfragments/44925.bugfix.rst:
##
@@ -0,0 +1 @@
+Fix short circuit operator in mapped tasks

Review Comment:
   I'll fix it later



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [WIP] Implement short circuit operator [airflow]

2025-02-08 Thread via GitHub


shahar1 commented on code in PR #46584:
URL: https://github.com/apache/airflow/pull/46584#discussion_r1947936092


##
airflow/models/skipmixin.py:
##
@@ -120,8 +120,13 @@ def skip(
 return

Review Comment:
   If my suggestion for relocating the "skipping" logic to the task_runner 
makes sense, I think that both `SkipMixin` and `BranchMixin` will be eventually 
deleted (you could ignore the changes here, it's from #44925).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [WIP] Implement short circuit operator [airflow]

2025-02-08 Thread via GitHub


shahar1 commented on code in PR #46584:
URL: https://github.com/apache/airflow/pull/46584#discussion_r1947936146


##
airflow/exceptions.py:
##
@@ -395,6 +395,19 @@ class ConnectionNotUnique(AirflowException):
 """Raise when multiple values are found for the same connection ID."""
 
 
+class SkipDownstreamTaskInstances(BaseException):

Review Comment:
   Should I relocate this exception to the SDK? (the other exceptions here are 
also used there)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [WIP] Implement short circuit operator [airflow]

2025-02-08 Thread via GitHub


shahar1 commented on code in PR #46584:
URL: https://github.com/apache/airflow/pull/46584#discussion_r1947936092


##
airflow/models/skipmixin.py:
##
@@ -120,8 +120,13 @@ def skip(
 return

Review Comment:
   If my suggestion for relocating the "skipping" logic to the task_runner 
makes sense, I think that this Mixin will be eventually deleted (you could 
ignore the changes here, it's from #44925).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [WIP] Implement short circuit operator [airflow]

2025-02-08 Thread via GitHub


shahar1 commented on code in PR #46584:
URL: https://github.com/apache/airflow/pull/46584#discussion_r1947935668


##
providers/common/compat/src/airflow/providers/common/compat/standard/short_circuit.py:
##
@@ -0,0 +1,36 @@
+import logging
+
+
+def get_tasks_to_skip(log, condition, task, downstream_task_ids, 
ignore_downstream_trigger_rules: bool):
+"""
+Compatibility function for short-circuiting tasks.
+In Airflow v2, it will be utilized by the ShortCircuitOperator.
+In Airflow v3+, it will be utilized by the task_runner.
+"""

Review Comment:
   Important to preserve backward compatiblity



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [WIP] Implement short circuit operator [airflow]

2025-02-08 Thread via GitHub


shahar1 commented on code in PR #46584:
URL: https://github.com/apache/airflow/pull/46584#discussion_r1947935272


##
providers/standard/src/airflow/providers/standard/operators/python.py:
##
@@ -272,48 +274,25 @@ def __init__(self, *, ignore_downstream_trigger_rules: 
bool = True, **kwargs) ->
 
 def execute(self, context: Context) -> Any:
 condition = super().execute(context)
-self.log.info("Condition result is %s", condition)
-
-if condition:
-self.log.info("Proceeding with downstream tasks...")
-return condition
-
-if not self.downstream_task_ids:
-self.log.info("No downstream tasks; nothing to do.")
-return condition
-
-dag_run = context["dag_run"]
-
-def get_tasks_to_skip():
-if self.ignore_downstream_trigger_rules is True:
-tasks = context["task"].get_flat_relatives(upstream=False)
-else:
-tasks = context["task"].get_direct_relatives(upstream=False)
-for t in tasks:
-if not t.is_teardown:
-yield t
-
-to_skip = get_tasks_to_skip()
-
-# this let's us avoid an intermediate list unless debug logging
-if self.log.getEffectiveLevel() <= logging.DEBUG:
-self.log.debug("Downstream task IDs %s", to_skip := 
list(get_tasks_to_skip()))
-
-self.log.info("Skipping downstream tasks")
 if AIRFLOW_V_3_0_PLUS:
-self.skip(
-dag_id=dag_run.dag_id,
-run_id=dag_run.run_id,
-tasks=to_skip,
-map_index=context["ti"].map_index,
-)
+raise SkipDownstreamTaskInstances(condition=condition,
+  
ignore_downstream_trigger_rules=self.ignore_downstream_trigger_rules
+  )

Review Comment:
   I think that it makes more sense in the new abstraction to handle the 
"skipping" logic in the task runner rather than the standard operator itself, 
since the operator cannot set XComs directly.
   
   The operator runs some logic which results in a boolean value, and upon 
signaling the task runner - the latter would handle the setting the xcom with a 
list of all affected tasks (it already has all of the required information for 
that, except for the condition and the configruation).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [WIP] Implement short circuit operator [airflow]

2025-02-08 Thread via GitHub


shahar1 commented on code in PR #46584:
URL: https://github.com/apache/airflow/pull/46584#discussion_r1947835723


##
task_sdk/src/airflow/sdk/execution_time/task_runner.py:
##
@@ -564,11 +565,27 @@ def run(ti: RuntimeTaskInstance, log: Logger) -> 
ToSupervisor | None:
 msg = early_exit
 return msg
 
-result = _execute_task(context, ti)
+# TODO: looks a bit forced, couldn't come up with a better way 
that won't require a lot of changes

Review Comment:
   If you have any ideas to implement it in a better way considering the 
existing limitations, I'd be happy for suggestions



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [WIP] Implement short circuit operator [airflow]

2025-02-08 Thread via GitHub


shahar1 commented on code in PR #46584:
URL: https://github.com/apache/airflow/pull/46584#discussion_r1947835723


##
task_sdk/src/airflow/sdk/execution_time/task_runner.py:
##
@@ -564,11 +565,27 @@ def run(ti: RuntimeTaskInstance, log: Logger) -> 
ToSupervisor | None:
 msg = early_exit
 return msg
 
-result = _execute_task(context, ti)
+# TODO: looks a bit forced, couldn't come up with a better way 
that won't require a lot of changes

Review Comment:
   If you have any ideas to implement it in a better way considering the 
existing limitations, I'd be happy for suggestions



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [WIP] Implement short circuit operator [airflow]

2025-02-08 Thread via GitHub


shahar1 opened a new pull request, #46584:
URL: https://github.com/apache/airflow/pull/46584

   
   
   
   
   
   
   
   ---
   **^ Add meaningful description above**
   Read the **[Pull Request 
Guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#pull-request-guidelines)**
 for more information.
   In case of fundamental code changes, an Airflow Improvement Proposal 
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals))
 is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party 
License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in a 
newsfragment file, named `{pr_number}.significant.rst` or 
`{issue_number}.significant.rst`, in 
[newsfragments](https://github.com/apache/airflow/tree/main/newsfragments).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org