ashb commented on code in PR #46584: URL: https://github.com/apache/airflow/pull/46584#discussion_r1948040857
########## providers/standard/src/airflow/providers/standard/operators/python.py: ########## @@ -272,48 +274,25 @@ def __init__(self, *, ignore_downstream_trigger_rules: bool = True, **kwargs) -> def execute(self, context: Context) -> Any: condition = super().execute(context) - self.log.info("Condition result is %s", condition) - - if condition: - self.log.info("Proceeding with downstream tasks...") - return condition - - if not self.downstream_task_ids: - self.log.info("No downstream tasks; nothing to do.") - return condition - - dag_run = context["dag_run"] - - def get_tasks_to_skip(): - if self.ignore_downstream_trigger_rules is True: - tasks = context["task"].get_flat_relatives(upstream=False) - else: - tasks = context["task"].get_direct_relatives(upstream=False) - for t in tasks: - if not t.is_teardown: - yield t - - to_skip = get_tasks_to_skip() - - # this let's us avoid an intermediate list unless debug logging - if self.log.getEffectiveLevel() <= logging.DEBUG: - self.log.debug("Downstream task IDs %s", to_skip := list(get_tasks_to_skip())) - - self.log.info("Skipping downstream tasks") if AIRFLOW_V_3_0_PLUS: - self.skip( - dag_id=dag_run.dag_id, - run_id=dag_run.run_id, - tasks=to_skip, - map_index=context["ti"].map_index, - ) + raise SkipDownstreamTaskInstances(condition=condition, + ignore_downstream_trigger_rules=self.ignore_downstream_trigger_rules + ) Review Comment: Moving it makes sense, but operators can set xcom still, `ti.xcom_push` works fine -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org